drbd: track meta data IO intent, start and submit time
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44
45 /* endio handlers:
46  *   drbd_md_io_complete (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   bm_async_io_complete (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57
58
59 /* About the global_state_lock
60    Each state transition on an device holds a read lock. In case we have
61    to evaluate the resync after dependencies, we grab a write lock, because
62    we need stable states on all devices for that.  */
63 rwlock_t global_state_lock;
64
65 /* used for synchronous meta data and bitmap IO
66  * submitted by drbd_md_sync_page_io()
67  */
68 void drbd_md_io_complete(struct bio *bio, int error)
69 {
70         struct drbd_device *device;
71
72         device = bio->bi_private;
73         device->md_io.error = error;
74
75         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
76          * to timeout on the lower level device, and eventually detach from it.
77          * If this io completion runs after that timeout expired, this
78          * drbd_md_put_buffer() may allow us to finally try and re-attach.
79          * During normal operation, this only puts that extra reference
80          * down to 1 again.
81          * Make sure we first drop the reference, and only then signal
82          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
83          * next drbd_md_sync_page_io(), that we trigger the
84          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
85          */
86         drbd_md_put_buffer(device);
87         device->md_io.done = 1;
88         wake_up(&device->misc_wait);
89         bio_put(bio);
90         if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
91                 put_ldev(device);
92 }
93
94 /* reads on behalf of the partner,
95  * "submitted" by the receiver
96  */
97 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
98 {
99         unsigned long flags = 0;
100         struct drbd_peer_device *peer_device = peer_req->peer_device;
101         struct drbd_device *device = peer_device->device;
102
103         spin_lock_irqsave(&device->resource->req_lock, flags);
104         device->read_cnt += peer_req->i.size >> 9;
105         list_del(&peer_req->w.list);
106         if (list_empty(&device->read_ee))
107                 wake_up(&device->ee_wait);
108         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
109                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
110         spin_unlock_irqrestore(&device->resource->req_lock, flags);
111
112         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
113         put_ldev(device);
114 }
115
116 /* writes on behalf of the partner, or resync writes,
117  * "submitted" by the receiver, final stage.  */
118 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
119 {
120         unsigned long flags = 0;
121         struct drbd_peer_device *peer_device = peer_req->peer_device;
122         struct drbd_device *device = peer_device->device;
123         struct drbd_interval i;
124         int do_wake;
125         u64 block_id;
126         int do_al_complete_io;
127
128         /* after we moved peer_req to done_ee,
129          * we may no longer access it,
130          * it may be freed/reused already!
131          * (as soon as we release the req_lock) */
132         i = peer_req->i;
133         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
134         block_id = peer_req->block_id;
135
136         spin_lock_irqsave(&device->resource->req_lock, flags);
137         device->writ_cnt += peer_req->i.size >> 9;
138         list_move_tail(&peer_req->w.list, &device->done_ee);
139
140         /*
141          * Do not remove from the write_requests tree here: we did not send the
142          * Ack yet and did not wake possibly waiting conflicting requests.
143          * Removed from the tree from "drbd_process_done_ee" within the
144          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
145          * _drbd_clear_done_ee.
146          */
147
148         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
149
150         /* FIXME do we want to detach for failed REQ_DISCARD?
151          * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
152         if (peer_req->flags & EE_WAS_ERROR)
153                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
154         spin_unlock_irqrestore(&device->resource->req_lock, flags);
155
156         if (block_id == ID_SYNCER)
157                 drbd_rs_complete_io(device, i.sector);
158
159         if (do_wake)
160                 wake_up(&device->ee_wait);
161
162         if (do_al_complete_io)
163                 drbd_al_complete_io(device, &i);
164
165         wake_asender(peer_device->connection);
166         put_ldev(device);
167 }
168
169 /* writes on behalf of the partner, or resync writes,
170  * "submitted" by the receiver.
171  */
172 void drbd_peer_request_endio(struct bio *bio, int error)
173 {
174         struct drbd_peer_request *peer_req = bio->bi_private;
175         struct drbd_device *device = peer_req->peer_device->device;
176         int uptodate = bio_flagged(bio, BIO_UPTODATE);
177         int is_write = bio_data_dir(bio) == WRITE;
178         int is_discard = !!(bio->bi_rw & REQ_DISCARD);
179
180         if (error && __ratelimit(&drbd_ratelimit_state))
181                 drbd_warn(device, "%s: error=%d s=%llus\n",
182                                 is_write ? (is_discard ? "discard" : "write")
183                                         : "read", error,
184                                 (unsigned long long)peer_req->i.sector);
185         if (!error && !uptodate) {
186                 if (__ratelimit(&drbd_ratelimit_state))
187                         drbd_warn(device, "%s: setting error to -EIO s=%llus\n",
188                                         is_write ? "write" : "read",
189                                         (unsigned long long)peer_req->i.sector);
190                 /* strange behavior of some lower level drivers...
191                  * fail the request by clearing the uptodate flag,
192                  * but do not return any error?! */
193                 error = -EIO;
194         }
195
196         if (error)
197                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
198
199         bio_put(bio); /* no need for the bio anymore */
200         if (atomic_dec_and_test(&peer_req->pending_bios)) {
201                 if (is_write)
202                         drbd_endio_write_sec_final(peer_req);
203                 else
204                         drbd_endio_read_sec_final(peer_req);
205         }
206 }
207
208 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
209  */
210 void drbd_request_endio(struct bio *bio, int error)
211 {
212         unsigned long flags;
213         struct drbd_request *req = bio->bi_private;
214         struct drbd_device *device = req->device;
215         struct bio_and_error m;
216         enum drbd_req_event what;
217         int uptodate = bio_flagged(bio, BIO_UPTODATE);
218
219         if (!error && !uptodate) {
220                 drbd_warn(device, "p %s: setting error to -EIO\n",
221                          bio_data_dir(bio) == WRITE ? "write" : "read");
222                 /* strange behavior of some lower level drivers...
223                  * fail the request by clearing the uptodate flag,
224                  * but do not return any error?! */
225                 error = -EIO;
226         }
227
228
229         /* If this request was aborted locally before,
230          * but now was completed "successfully",
231          * chances are that this caused arbitrary data corruption.
232          *
233          * "aborting" requests, or force-detaching the disk, is intended for
234          * completely blocked/hung local backing devices which do no longer
235          * complete requests at all, not even do error completions.  In this
236          * situation, usually a hard-reset and failover is the only way out.
237          *
238          * By "aborting", basically faking a local error-completion,
239          * we allow for a more graceful swichover by cleanly migrating services.
240          * Still the affected node has to be rebooted "soon".
241          *
242          * By completing these requests, we allow the upper layers to re-use
243          * the associated data pages.
244          *
245          * If later the local backing device "recovers", and now DMAs some data
246          * from disk into the original request pages, in the best case it will
247          * just put random data into unused pages; but typically it will corrupt
248          * meanwhile completely unrelated data, causing all sorts of damage.
249          *
250          * Which means delayed successful completion,
251          * especially for READ requests,
252          * is a reason to panic().
253          *
254          * We assume that a delayed *error* completion is OK,
255          * though we still will complain noisily about it.
256          */
257         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
258                 if (__ratelimit(&drbd_ratelimit_state))
259                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
260
261                 if (!error)
262                         panic("possible random memory corruption caused by delayed completion of aborted local request\n");
263         }
264
265         /* to avoid recursion in __req_mod */
266         if (unlikely(error)) {
267                 if (bio->bi_rw & REQ_DISCARD)
268                         what = (error == -EOPNOTSUPP)
269                                 ? DISCARD_COMPLETED_NOTSUPP
270                                 : DISCARD_COMPLETED_WITH_ERROR;
271                 else
272                         what = (bio_data_dir(bio) == WRITE)
273                         ? WRITE_COMPLETED_WITH_ERROR
274                         : (bio_rw(bio) == READ)
275                           ? READ_COMPLETED_WITH_ERROR
276                           : READ_AHEAD_COMPLETED_WITH_ERROR;
277         } else
278                 what = COMPLETED_OK;
279
280         bio_put(req->private_bio);
281         req->private_bio = ERR_PTR(error);
282
283         /* not req_mod(), we need irqsave here! */
284         spin_lock_irqsave(&device->resource->req_lock, flags);
285         __req_mod(req, what, &m);
286         spin_unlock_irqrestore(&device->resource->req_lock, flags);
287         put_ldev(device);
288
289         if (m.bio)
290                 complete_master_bio(device, &m);
291 }
292
293 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
294 {
295         struct hash_desc desc;
296         struct scatterlist sg;
297         struct page *page = peer_req->pages;
298         struct page *tmp;
299         unsigned len;
300
301         desc.tfm = tfm;
302         desc.flags = 0;
303
304         sg_init_table(&sg, 1);
305         crypto_hash_init(&desc);
306
307         while ((tmp = page_chain_next(page))) {
308                 /* all but the last page will be fully used */
309                 sg_set_page(&sg, page, PAGE_SIZE, 0);
310                 crypto_hash_update(&desc, &sg, sg.length);
311                 page = tmp;
312         }
313         /* and now the last, possibly only partially used page */
314         len = peer_req->i.size & (PAGE_SIZE - 1);
315         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
316         crypto_hash_update(&desc, &sg, sg.length);
317         crypto_hash_final(&desc, digest);
318 }
319
320 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
321 {
322         struct hash_desc desc;
323         struct scatterlist sg;
324         struct bio_vec bvec;
325         struct bvec_iter iter;
326
327         desc.tfm = tfm;
328         desc.flags = 0;
329
330         sg_init_table(&sg, 1);
331         crypto_hash_init(&desc);
332
333         bio_for_each_segment(bvec, bio, iter) {
334                 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
335                 crypto_hash_update(&desc, &sg, sg.length);
336         }
337         crypto_hash_final(&desc, digest);
338 }
339
340 /* MAYBE merge common code with w_e_end_ov_req */
341 static int w_e_send_csum(struct drbd_work *w, int cancel)
342 {
343         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
344         struct drbd_peer_device *peer_device = peer_req->peer_device;
345         struct drbd_device *device = peer_device->device;
346         int digest_size;
347         void *digest;
348         int err = 0;
349
350         if (unlikely(cancel))
351                 goto out;
352
353         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
354                 goto out;
355
356         digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
357         digest = kmalloc(digest_size, GFP_NOIO);
358         if (digest) {
359                 sector_t sector = peer_req->i.sector;
360                 unsigned int size = peer_req->i.size;
361                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
362                 /* Free peer_req and pages before send.
363                  * In case we block on congestion, we could otherwise run into
364                  * some distributed deadlock, if the other side blocks on
365                  * congestion as well, because our receiver blocks in
366                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
367                 drbd_free_peer_req(device, peer_req);
368                 peer_req = NULL;
369                 inc_rs_pending(device);
370                 err = drbd_send_drequest_csum(peer_device, sector, size,
371                                               digest, digest_size,
372                                               P_CSUM_RS_REQUEST);
373                 kfree(digest);
374         } else {
375                 drbd_err(device, "kmalloc() of digest failed.\n");
376                 err = -ENOMEM;
377         }
378
379 out:
380         if (peer_req)
381                 drbd_free_peer_req(device, peer_req);
382
383         if (unlikely(err))
384                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
385         return err;
386 }
387
388 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
389
390 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
391 {
392         struct drbd_device *device = peer_device->device;
393         struct drbd_peer_request *peer_req;
394
395         if (!get_ldev(device))
396                 return -EIO;
397
398         if (drbd_rs_should_slow_down(device, sector))
399                 goto defer;
400
401         /* GFP_TRY, because if there is no memory available right now, this may
402          * be rescheduled for later. It is "only" background resync, after all. */
403         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
404                                        size, true /* has real payload */, GFP_TRY);
405         if (!peer_req)
406                 goto defer;
407
408         peer_req->w.cb = w_e_send_csum;
409         spin_lock_irq(&device->resource->req_lock);
410         list_add_tail(&peer_req->w.list, &device->read_ee);
411         spin_unlock_irq(&device->resource->req_lock);
412
413         atomic_add(size >> 9, &device->rs_sect_ev);
414         if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
415                 return 0;
416
417         /* If it failed because of ENOMEM, retry should help.  If it failed
418          * because bio_add_page failed (probably broken lower level driver),
419          * retry may or may not help.
420          * If it does not, you may need to force disconnect. */
421         spin_lock_irq(&device->resource->req_lock);
422         list_del(&peer_req->w.list);
423         spin_unlock_irq(&device->resource->req_lock);
424
425         drbd_free_peer_req(device, peer_req);
426 defer:
427         put_ldev(device);
428         return -EAGAIN;
429 }
430
431 int w_resync_timer(struct drbd_work *w, int cancel)
432 {
433         struct drbd_device *device =
434                 container_of(w, struct drbd_device, resync_work);
435
436         switch (device->state.conn) {
437         case C_VERIFY_S:
438                 make_ov_request(device, cancel);
439                 break;
440         case C_SYNC_TARGET:
441                 make_resync_request(device, cancel);
442                 break;
443         }
444
445         return 0;
446 }
447
448 void resync_timer_fn(unsigned long data)
449 {
450         struct drbd_device *device = (struct drbd_device *) data;
451
452         drbd_queue_work_if_unqueued(
453                 &first_peer_device(device)->connection->sender_work,
454                 &device->resync_work);
455 }
456
457 static void fifo_set(struct fifo_buffer *fb, int value)
458 {
459         int i;
460
461         for (i = 0; i < fb->size; i++)
462                 fb->values[i] = value;
463 }
464
465 static int fifo_push(struct fifo_buffer *fb, int value)
466 {
467         int ov;
468
469         ov = fb->values[fb->head_index];
470         fb->values[fb->head_index++] = value;
471
472         if (fb->head_index >= fb->size)
473                 fb->head_index = 0;
474
475         return ov;
476 }
477
478 static void fifo_add_val(struct fifo_buffer *fb, int value)
479 {
480         int i;
481
482         for (i = 0; i < fb->size; i++)
483                 fb->values[i] += value;
484 }
485
486 struct fifo_buffer *fifo_alloc(int fifo_size)
487 {
488         struct fifo_buffer *fb;
489
490         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
491         if (!fb)
492                 return NULL;
493
494         fb->head_index = 0;
495         fb->size = fifo_size;
496         fb->total = 0;
497
498         return fb;
499 }
500
501 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
502 {
503         struct disk_conf *dc;
504         unsigned int want;     /* The number of sectors we want in-flight */
505         int req_sect; /* Number of sectors to request in this turn */
506         int correction; /* Number of sectors more we need in-flight */
507         int cps; /* correction per invocation of drbd_rs_controller() */
508         int steps; /* Number of time steps to plan ahead */
509         int curr_corr;
510         int max_sect;
511         struct fifo_buffer *plan;
512
513         dc = rcu_dereference(device->ldev->disk_conf);
514         plan = rcu_dereference(device->rs_plan_s);
515
516         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
517
518         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
519                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
520         } else { /* normal path */
521                 want = dc->c_fill_target ? dc->c_fill_target :
522                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
523         }
524
525         correction = want - device->rs_in_flight - plan->total;
526
527         /* Plan ahead */
528         cps = correction / steps;
529         fifo_add_val(plan, cps);
530         plan->total += cps * steps;
531
532         /* What we do in this step */
533         curr_corr = fifo_push(plan, 0);
534         plan->total -= curr_corr;
535
536         req_sect = sect_in + curr_corr;
537         if (req_sect < 0)
538                 req_sect = 0;
539
540         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
541         if (req_sect > max_sect)
542                 req_sect = max_sect;
543
544         /*
545         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
546                  sect_in, device->rs_in_flight, want, correction,
547                  steps, cps, device->rs_planed, curr_corr, req_sect);
548         */
549
550         return req_sect;
551 }
552
553 static int drbd_rs_number_requests(struct drbd_device *device)
554 {
555         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
556         int number, mxb;
557
558         sect_in = atomic_xchg(&device->rs_sect_in, 0);
559         device->rs_in_flight -= sect_in;
560
561         rcu_read_lock();
562         mxb = drbd_get_max_buffers(device) / 2;
563         if (rcu_dereference(device->rs_plan_s)->size) {
564                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
565                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
566         } else {
567                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
568                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
569         }
570         rcu_read_unlock();
571
572         /* Don't have more than "max-buffers"/2 in-flight.
573          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
574          * potentially causing a distributed deadlock on congestion during
575          * online-verify or (checksum-based) resync, if max-buffers,
576          * socket buffer sizes and resync rate settings are mis-configured. */
577
578         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
579          * mxb (as used here, and in drbd_alloc_pages on the peer) is
580          * "number of pages" (typically also 4k),
581          * but "rs_in_flight" is in "sectors" (512 Byte). */
582         if (mxb - device->rs_in_flight/8 < number)
583                 number = mxb - device->rs_in_flight/8;
584
585         return number;
586 }
587
588 static int make_resync_request(struct drbd_device *const device, int cancel)
589 {
590         struct drbd_peer_device *const peer_device = first_peer_device(device);
591         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
592         unsigned long bit;
593         sector_t sector;
594         const sector_t capacity = drbd_get_capacity(device->this_bdev);
595         int max_bio_size;
596         int number, rollback_i, size;
597         int align, requeue = 0;
598         int i = 0;
599
600         if (unlikely(cancel))
601                 return 0;
602
603         if (device->rs_total == 0) {
604                 /* empty resync? */
605                 drbd_resync_finished(device);
606                 return 0;
607         }
608
609         if (!get_ldev(device)) {
610                 /* Since we only need to access device->rsync a
611                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
612                    to continue resync with a broken disk makes no sense at
613                    all */
614                 drbd_err(device, "Disk broke down during resync!\n");
615                 return 0;
616         }
617
618         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
619         number = drbd_rs_number_requests(device);
620         if (number <= 0)
621                 goto requeue;
622
623         for (i = 0; i < number; i++) {
624                 /* Stop generating RS requests when half of the send buffer is filled,
625                  * but notify TCP that we'd like to have more space. */
626                 mutex_lock(&connection->data.mutex);
627                 if (connection->data.socket) {
628                         struct sock *sk = connection->data.socket->sk;
629                         int queued = sk->sk_wmem_queued;
630                         int sndbuf = sk->sk_sndbuf;
631                         if (queued > sndbuf / 2) {
632                                 requeue = 1;
633                                 if (sk->sk_socket)
634                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
635                         }
636                 } else
637                         requeue = 1;
638                 mutex_unlock(&connection->data.mutex);
639                 if (requeue)
640                         goto requeue;
641
642 next_sector:
643                 size = BM_BLOCK_SIZE;
644                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
645
646                 if (bit == DRBD_END_OF_BITMAP) {
647                         device->bm_resync_fo = drbd_bm_bits(device);
648                         put_ldev(device);
649                         return 0;
650                 }
651
652                 sector = BM_BIT_TO_SECT(bit);
653
654                 if (drbd_rs_should_slow_down(device, sector) ||
655                     drbd_try_rs_begin_io(device, sector)) {
656                         device->bm_resync_fo = bit;
657                         goto requeue;
658                 }
659                 device->bm_resync_fo = bit + 1;
660
661                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
662                         drbd_rs_complete_io(device, sector);
663                         goto next_sector;
664                 }
665
666 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
667                 /* try to find some adjacent bits.
668                  * we stop if we have already the maximum req size.
669                  *
670                  * Additionally always align bigger requests, in order to
671                  * be prepared for all stripe sizes of software RAIDs.
672                  */
673                 align = 1;
674                 rollback_i = i;
675                 while (i < number) {
676                         if (size + BM_BLOCK_SIZE > max_bio_size)
677                                 break;
678
679                         /* Be always aligned */
680                         if (sector & ((1<<(align+3))-1))
681                                 break;
682
683                         /* do not cross extent boundaries */
684                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
685                                 break;
686                         /* now, is it actually dirty, after all?
687                          * caution, drbd_bm_test_bit is tri-state for some
688                          * obscure reason; ( b == 0 ) would get the out-of-band
689                          * only accidentally right because of the "oddly sized"
690                          * adjustment below */
691                         if (drbd_bm_test_bit(device, bit+1) != 1)
692                                 break;
693                         bit++;
694                         size += BM_BLOCK_SIZE;
695                         if ((BM_BLOCK_SIZE << align) <= size)
696                                 align++;
697                         i++;
698                 }
699                 /* if we merged some,
700                  * reset the offset to start the next drbd_bm_find_next from */
701                 if (size > BM_BLOCK_SIZE)
702                         device->bm_resync_fo = bit + 1;
703 #endif
704
705                 /* adjust very last sectors, in case we are oddly sized */
706                 if (sector + (size>>9) > capacity)
707                         size = (capacity-sector)<<9;
708
709                 if (device->use_csums) {
710                         switch (read_for_csum(peer_device, sector, size)) {
711                         case -EIO: /* Disk failure */
712                                 put_ldev(device);
713                                 return -EIO;
714                         case -EAGAIN: /* allocation failed, or ldev busy */
715                                 drbd_rs_complete_io(device, sector);
716                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
717                                 i = rollback_i;
718                                 goto requeue;
719                         case 0:
720                                 /* everything ok */
721                                 break;
722                         default:
723                                 BUG();
724                         }
725                 } else {
726                         int err;
727
728                         inc_rs_pending(device);
729                         err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
730                                                  sector, size, ID_SYNCER);
731                         if (err) {
732                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
733                                 dec_rs_pending(device);
734                                 put_ldev(device);
735                                 return err;
736                         }
737                 }
738         }
739
740         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
741                 /* last syncer _request_ was sent,
742                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
743                  * next sync group will resume), as soon as we receive the last
744                  * resync data block, and the last bit is cleared.
745                  * until then resync "work" is "inactive" ...
746                  */
747                 put_ldev(device);
748                 return 0;
749         }
750
751  requeue:
752         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
753         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
754         put_ldev(device);
755         return 0;
756 }
757
758 static int make_ov_request(struct drbd_device *device, int cancel)
759 {
760         int number, i, size;
761         sector_t sector;
762         const sector_t capacity = drbd_get_capacity(device->this_bdev);
763         bool stop_sector_reached = false;
764
765         if (unlikely(cancel))
766                 return 1;
767
768         number = drbd_rs_number_requests(device);
769
770         sector = device->ov_position;
771         for (i = 0; i < number; i++) {
772                 if (sector >= capacity)
773                         return 1;
774
775                 /* We check for "finished" only in the reply path:
776                  * w_e_end_ov_reply().
777                  * We need to send at least one request out. */
778                 stop_sector_reached = i > 0
779                         && verify_can_do_stop_sector(device)
780                         && sector >= device->ov_stop_sector;
781                 if (stop_sector_reached)
782                         break;
783
784                 size = BM_BLOCK_SIZE;
785
786                 if (drbd_rs_should_slow_down(device, sector) ||
787                     drbd_try_rs_begin_io(device, sector)) {
788                         device->ov_position = sector;
789                         goto requeue;
790                 }
791
792                 if (sector + (size>>9) > capacity)
793                         size = (capacity-sector)<<9;
794
795                 inc_rs_pending(device);
796                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
797                         dec_rs_pending(device);
798                         return 0;
799                 }
800                 sector += BM_SECT_PER_BIT;
801         }
802         device->ov_position = sector;
803
804  requeue:
805         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
806         if (i == 0 || !stop_sector_reached)
807                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
808         return 1;
809 }
810
811 int w_ov_finished(struct drbd_work *w, int cancel)
812 {
813         struct drbd_device_work *dw =
814                 container_of(w, struct drbd_device_work, w);
815         struct drbd_device *device = dw->device;
816         kfree(dw);
817         ov_out_of_sync_print(device);
818         drbd_resync_finished(device);
819
820         return 0;
821 }
822
823 static int w_resync_finished(struct drbd_work *w, int cancel)
824 {
825         struct drbd_device_work *dw =
826                 container_of(w, struct drbd_device_work, w);
827         struct drbd_device *device = dw->device;
828         kfree(dw);
829
830         drbd_resync_finished(device);
831
832         return 0;
833 }
834
835 static void ping_peer(struct drbd_device *device)
836 {
837         struct drbd_connection *connection = first_peer_device(device)->connection;
838
839         clear_bit(GOT_PING_ACK, &connection->flags);
840         request_ping(connection);
841         wait_event(connection->ping_wait,
842                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
843 }
844
845 int drbd_resync_finished(struct drbd_device *device)
846 {
847         unsigned long db, dt, dbdt;
848         unsigned long n_oos;
849         union drbd_state os, ns;
850         struct drbd_device_work *dw;
851         char *khelper_cmd = NULL;
852         int verify_done = 0;
853
854         /* Remove all elements from the resync LRU. Since future actions
855          * might set bits in the (main) bitmap, then the entries in the
856          * resync LRU would be wrong. */
857         if (drbd_rs_del_all(device)) {
858                 /* In case this is not possible now, most probably because
859                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
860                  * queue (or even the read operations for those packets
861                  * is not finished by now).   Retry in 100ms. */
862
863                 schedule_timeout_interruptible(HZ / 10);
864                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
865                 if (dw) {
866                         dw->w.cb = w_resync_finished;
867                         dw->device = device;
868                         drbd_queue_work(&first_peer_device(device)->connection->sender_work,
869                                         &dw->w);
870                         return 1;
871                 }
872                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
873         }
874
875         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
876         if (dt <= 0)
877                 dt = 1;
878
879         db = device->rs_total;
880         /* adjust for verify start and stop sectors, respective reached position */
881         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
882                 db -= device->ov_left;
883
884         dbdt = Bit2KB(db/dt);
885         device->rs_paused /= HZ;
886
887         if (!get_ldev(device))
888                 goto out;
889
890         ping_peer(device);
891
892         spin_lock_irq(&device->resource->req_lock);
893         os = drbd_read_state(device);
894
895         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
896
897         /* This protects us against multiple calls (that can happen in the presence
898            of application IO), and against connectivity loss just before we arrive here. */
899         if (os.conn <= C_CONNECTED)
900                 goto out_unlock;
901
902         ns = os;
903         ns.conn = C_CONNECTED;
904
905         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
906              verify_done ? "Online verify" : "Resync",
907              dt + device->rs_paused, device->rs_paused, dbdt);
908
909         n_oos = drbd_bm_total_weight(device);
910
911         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
912                 if (n_oos) {
913                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
914                               n_oos, Bit2KB(1));
915                         khelper_cmd = "out-of-sync";
916                 }
917         } else {
918                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
919
920                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
921                         khelper_cmd = "after-resync-target";
922
923                 if (device->use_csums && device->rs_total) {
924                         const unsigned long s = device->rs_same_csum;
925                         const unsigned long t = device->rs_total;
926                         const int ratio =
927                                 (t == 0)     ? 0 :
928                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
929                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
930                              "transferred %luK total %luK\n",
931                              ratio,
932                              Bit2KB(device->rs_same_csum),
933                              Bit2KB(device->rs_total - device->rs_same_csum),
934                              Bit2KB(device->rs_total));
935                 }
936         }
937
938         if (device->rs_failed) {
939                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
940
941                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
942                         ns.disk = D_INCONSISTENT;
943                         ns.pdsk = D_UP_TO_DATE;
944                 } else {
945                         ns.disk = D_UP_TO_DATE;
946                         ns.pdsk = D_INCONSISTENT;
947                 }
948         } else {
949                 ns.disk = D_UP_TO_DATE;
950                 ns.pdsk = D_UP_TO_DATE;
951
952                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
953                         if (device->p_uuid) {
954                                 int i;
955                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
956                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
957                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
958                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
959                         } else {
960                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
961                         }
962                 }
963
964                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
965                         /* for verify runs, we don't update uuids here,
966                          * so there would be nothing to report. */
967                         drbd_uuid_set_bm(device, 0UL);
968                         drbd_print_uuids(device, "updated UUIDs");
969                         if (device->p_uuid) {
970                                 /* Now the two UUID sets are equal, update what we
971                                  * know of the peer. */
972                                 int i;
973                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
974                                         device->p_uuid[i] = device->ldev->md.uuid[i];
975                         }
976                 }
977         }
978
979         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
980 out_unlock:
981         spin_unlock_irq(&device->resource->req_lock);
982         put_ldev(device);
983 out:
984         device->rs_total  = 0;
985         device->rs_failed = 0;
986         device->rs_paused = 0;
987
988         /* reset start sector, if we reached end of device */
989         if (verify_done && device->ov_left == 0)
990                 device->ov_start_sector = 0;
991
992         drbd_md_sync(device);
993
994         if (khelper_cmd)
995                 drbd_khelper(device, khelper_cmd);
996
997         return 1;
998 }
999
1000 /* helper */
1001 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1002 {
1003         if (drbd_peer_req_has_active_page(peer_req)) {
1004                 /* This might happen if sendpage() has not finished */
1005                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1006                 atomic_add(i, &device->pp_in_use_by_net);
1007                 atomic_sub(i, &device->pp_in_use);
1008                 spin_lock_irq(&device->resource->req_lock);
1009                 list_add_tail(&peer_req->w.list, &device->net_ee);
1010                 spin_unlock_irq(&device->resource->req_lock);
1011                 wake_up(&drbd_pp_wait);
1012         } else
1013                 drbd_free_peer_req(device, peer_req);
1014 }
1015
1016 /**
1017  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1018  * @device:     DRBD device.
1019  * @w:          work object.
1020  * @cancel:     The connection will be closed anyways
1021  */
1022 int w_e_end_data_req(struct drbd_work *w, int cancel)
1023 {
1024         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1025         struct drbd_peer_device *peer_device = peer_req->peer_device;
1026         struct drbd_device *device = peer_device->device;
1027         int err;
1028
1029         if (unlikely(cancel)) {
1030                 drbd_free_peer_req(device, peer_req);
1031                 dec_unacked(device);
1032                 return 0;
1033         }
1034
1035         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1036                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1037         } else {
1038                 if (__ratelimit(&drbd_ratelimit_state))
1039                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1040                             (unsigned long long)peer_req->i.sector);
1041
1042                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1043         }
1044
1045         dec_unacked(device);
1046
1047         move_to_net_ee_or_free(device, peer_req);
1048
1049         if (unlikely(err))
1050                 drbd_err(device, "drbd_send_block() failed\n");
1051         return err;
1052 }
1053
1054 /**
1055  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1056  * @w:          work object.
1057  * @cancel:     The connection will be closed anyways
1058  */
1059 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1060 {
1061         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1062         struct drbd_peer_device *peer_device = peer_req->peer_device;
1063         struct drbd_device *device = peer_device->device;
1064         int err;
1065
1066         if (unlikely(cancel)) {
1067                 drbd_free_peer_req(device, peer_req);
1068                 dec_unacked(device);
1069                 return 0;
1070         }
1071
1072         if (get_ldev_if_state(device, D_FAILED)) {
1073                 drbd_rs_complete_io(device, peer_req->i.sector);
1074                 put_ldev(device);
1075         }
1076
1077         if (device->state.conn == C_AHEAD) {
1078                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1079         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1080                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1081                         inc_rs_pending(device);
1082                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1083                 } else {
1084                         if (__ratelimit(&drbd_ratelimit_state))
1085                                 drbd_err(device, "Not sending RSDataReply, "
1086                                     "partner DISKLESS!\n");
1087                         err = 0;
1088                 }
1089         } else {
1090                 if (__ratelimit(&drbd_ratelimit_state))
1091                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1092                             (unsigned long long)peer_req->i.sector);
1093
1094                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1095
1096                 /* update resync data with failure */
1097                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1098         }
1099
1100         dec_unacked(device);
1101
1102         move_to_net_ee_or_free(device, peer_req);
1103
1104         if (unlikely(err))
1105                 drbd_err(device, "drbd_send_block() failed\n");
1106         return err;
1107 }
1108
1109 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1110 {
1111         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1112         struct drbd_peer_device *peer_device = peer_req->peer_device;
1113         struct drbd_device *device = peer_device->device;
1114         struct digest_info *di;
1115         int digest_size;
1116         void *digest = NULL;
1117         int err, eq = 0;
1118
1119         if (unlikely(cancel)) {
1120                 drbd_free_peer_req(device, peer_req);
1121                 dec_unacked(device);
1122                 return 0;
1123         }
1124
1125         if (get_ldev(device)) {
1126                 drbd_rs_complete_io(device, peer_req->i.sector);
1127                 put_ldev(device);
1128         }
1129
1130         di = peer_req->digest;
1131
1132         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1133                 /* quick hack to try to avoid a race against reconfiguration.
1134                  * a real fix would be much more involved,
1135                  * introducing more locking mechanisms */
1136                 if (peer_device->connection->csums_tfm) {
1137                         digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1138                         D_ASSERT(device, digest_size == di->digest_size);
1139                         digest = kmalloc(digest_size, GFP_NOIO);
1140                 }
1141                 if (digest) {
1142                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1143                         eq = !memcmp(digest, di->digest, digest_size);
1144                         kfree(digest);
1145                 }
1146
1147                 if (eq) {
1148                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1149                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1150                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1151                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1152                 } else {
1153                         inc_rs_pending(device);
1154                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1155                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1156                         kfree(di);
1157                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1158                 }
1159         } else {
1160                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1161                 if (__ratelimit(&drbd_ratelimit_state))
1162                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1163         }
1164
1165         dec_unacked(device);
1166         move_to_net_ee_or_free(device, peer_req);
1167
1168         if (unlikely(err))
1169                 drbd_err(device, "drbd_send_block/ack() failed\n");
1170         return err;
1171 }
1172
1173 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1174 {
1175         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1176         struct drbd_peer_device *peer_device = peer_req->peer_device;
1177         struct drbd_device *device = peer_device->device;
1178         sector_t sector = peer_req->i.sector;
1179         unsigned int size = peer_req->i.size;
1180         int digest_size;
1181         void *digest;
1182         int err = 0;
1183
1184         if (unlikely(cancel))
1185                 goto out;
1186
1187         digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1188         digest = kmalloc(digest_size, GFP_NOIO);
1189         if (!digest) {
1190                 err = 1;        /* terminate the connection in case the allocation failed */
1191                 goto out;
1192         }
1193
1194         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1195                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1196         else
1197                 memset(digest, 0, digest_size);
1198
1199         /* Free e and pages before send.
1200          * In case we block on congestion, we could otherwise run into
1201          * some distributed deadlock, if the other side blocks on
1202          * congestion as well, because our receiver blocks in
1203          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1204         drbd_free_peer_req(device, peer_req);
1205         peer_req = NULL;
1206         inc_rs_pending(device);
1207         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1208         if (err)
1209                 dec_rs_pending(device);
1210         kfree(digest);
1211
1212 out:
1213         if (peer_req)
1214                 drbd_free_peer_req(device, peer_req);
1215         dec_unacked(device);
1216         return err;
1217 }
1218
1219 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1220 {
1221         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1222                 device->ov_last_oos_size += size>>9;
1223         } else {
1224                 device->ov_last_oos_start = sector;
1225                 device->ov_last_oos_size = size>>9;
1226         }
1227         drbd_set_out_of_sync(device, sector, size);
1228 }
1229
1230 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1231 {
1232         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1233         struct drbd_peer_device *peer_device = peer_req->peer_device;
1234         struct drbd_device *device = peer_device->device;
1235         struct digest_info *di;
1236         void *digest;
1237         sector_t sector = peer_req->i.sector;
1238         unsigned int size = peer_req->i.size;
1239         int digest_size;
1240         int err, eq = 0;
1241         bool stop_sector_reached = false;
1242
1243         if (unlikely(cancel)) {
1244                 drbd_free_peer_req(device, peer_req);
1245                 dec_unacked(device);
1246                 return 0;
1247         }
1248
1249         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1250          * the resync lru has been cleaned up already */
1251         if (get_ldev(device)) {
1252                 drbd_rs_complete_io(device, peer_req->i.sector);
1253                 put_ldev(device);
1254         }
1255
1256         di = peer_req->digest;
1257
1258         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1259                 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1260                 digest = kmalloc(digest_size, GFP_NOIO);
1261                 if (digest) {
1262                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1263
1264                         D_ASSERT(device, digest_size == di->digest_size);
1265                         eq = !memcmp(digest, di->digest, digest_size);
1266                         kfree(digest);
1267                 }
1268         }
1269
1270         /* Free peer_req and pages before send.
1271          * In case we block on congestion, we could otherwise run into
1272          * some distributed deadlock, if the other side blocks on
1273          * congestion as well, because our receiver blocks in
1274          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1275         drbd_free_peer_req(device, peer_req);
1276         if (!eq)
1277                 drbd_ov_out_of_sync_found(device, sector, size);
1278         else
1279                 ov_out_of_sync_print(device);
1280
1281         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1282                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1283
1284         dec_unacked(device);
1285
1286         --device->ov_left;
1287
1288         /* let's advance progress step marks only for every other megabyte */
1289         if ((device->ov_left & 0x200) == 0x200)
1290                 drbd_advance_rs_marks(device, device->ov_left);
1291
1292         stop_sector_reached = verify_can_do_stop_sector(device) &&
1293                 (sector + (size>>9)) >= device->ov_stop_sector;
1294
1295         if (device->ov_left == 0 || stop_sector_reached) {
1296                 ov_out_of_sync_print(device);
1297                 drbd_resync_finished(device);
1298         }
1299
1300         return err;
1301 }
1302
1303 /* FIXME
1304  * We need to track the number of pending barrier acks,
1305  * and to be able to wait for them.
1306  * See also comment in drbd_adm_attach before drbd_suspend_io.
1307  */
1308 static int drbd_send_barrier(struct drbd_connection *connection)
1309 {
1310         struct p_barrier *p;
1311         struct drbd_socket *sock;
1312
1313         sock = &connection->data;
1314         p = conn_prepare_command(connection, sock);
1315         if (!p)
1316                 return -EIO;
1317         p->barrier = connection->send.current_epoch_nr;
1318         p->pad = 0;
1319         connection->send.current_epoch_writes = 0;
1320
1321         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1322 }
1323
1324 int w_send_write_hint(struct drbd_work *w, int cancel)
1325 {
1326         struct drbd_device *device =
1327                 container_of(w, struct drbd_device, unplug_work);
1328         struct drbd_socket *sock;
1329
1330         if (cancel)
1331                 return 0;
1332         sock = &first_peer_device(device)->connection->data;
1333         if (!drbd_prepare_command(first_peer_device(device), sock))
1334                 return -EIO;
1335         return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1336 }
1337
1338 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1339 {
1340         if (!connection->send.seen_any_write_yet) {
1341                 connection->send.seen_any_write_yet = true;
1342                 connection->send.current_epoch_nr = epoch;
1343                 connection->send.current_epoch_writes = 0;
1344         }
1345 }
1346
1347 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1348 {
1349         /* re-init if first write on this connection */
1350         if (!connection->send.seen_any_write_yet)
1351                 return;
1352         if (connection->send.current_epoch_nr != epoch) {
1353                 if (connection->send.current_epoch_writes)
1354                         drbd_send_barrier(connection);
1355                 connection->send.current_epoch_nr = epoch;
1356         }
1357 }
1358
1359 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1360 {
1361         struct drbd_request *req = container_of(w, struct drbd_request, w);
1362         struct drbd_device *device = req->device;
1363         struct drbd_peer_device *const peer_device = first_peer_device(device);
1364         struct drbd_connection *const connection = peer_device->connection;
1365         int err;
1366
1367         if (unlikely(cancel)) {
1368                 req_mod(req, SEND_CANCELED);
1369                 return 0;
1370         }
1371
1372         /* this time, no connection->send.current_epoch_writes++;
1373          * If it was sent, it was the closing barrier for the last
1374          * replicated epoch, before we went into AHEAD mode.
1375          * No more barriers will be sent, until we leave AHEAD mode again. */
1376         maybe_send_barrier(connection, req->epoch);
1377
1378         err = drbd_send_out_of_sync(peer_device, req);
1379         req_mod(req, OOS_HANDED_TO_NETWORK);
1380
1381         return err;
1382 }
1383
1384 /**
1385  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1386  * @w:          work object.
1387  * @cancel:     The connection will be closed anyways
1388  */
1389 int w_send_dblock(struct drbd_work *w, int cancel)
1390 {
1391         struct drbd_request *req = container_of(w, struct drbd_request, w);
1392         struct drbd_device *device = req->device;
1393         struct drbd_peer_device *const peer_device = first_peer_device(device);
1394         struct drbd_connection *connection = peer_device->connection;
1395         int err;
1396
1397         if (unlikely(cancel)) {
1398                 req_mod(req, SEND_CANCELED);
1399                 return 0;
1400         }
1401
1402         re_init_if_first_write(connection, req->epoch);
1403         maybe_send_barrier(connection, req->epoch);
1404         connection->send.current_epoch_writes++;
1405
1406         err = drbd_send_dblock(peer_device, req);
1407         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1408
1409         return err;
1410 }
1411
1412 /**
1413  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1414  * @w:          work object.
1415  * @cancel:     The connection will be closed anyways
1416  */
1417 int w_send_read_req(struct drbd_work *w, int cancel)
1418 {
1419         struct drbd_request *req = container_of(w, struct drbd_request, w);
1420         struct drbd_device *device = req->device;
1421         struct drbd_peer_device *const peer_device = first_peer_device(device);
1422         struct drbd_connection *connection = peer_device->connection;
1423         int err;
1424
1425         if (unlikely(cancel)) {
1426                 req_mod(req, SEND_CANCELED);
1427                 return 0;
1428         }
1429
1430         /* Even read requests may close a write epoch,
1431          * if there was any yet. */
1432         maybe_send_barrier(connection, req->epoch);
1433
1434         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1435                                  (unsigned long)req);
1436
1437         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1438
1439         return err;
1440 }
1441
1442 int w_restart_disk_io(struct drbd_work *w, int cancel)
1443 {
1444         struct drbd_request *req = container_of(w, struct drbd_request, w);
1445         struct drbd_device *device = req->device;
1446
1447         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1448                 drbd_al_begin_io(device, &req->i);
1449
1450         drbd_req_make_private_bio(req, req->master_bio);
1451         req->private_bio->bi_bdev = device->ldev->backing_bdev;
1452         generic_make_request(req->private_bio);
1453
1454         return 0;
1455 }
1456
1457 static int _drbd_may_sync_now(struct drbd_device *device)
1458 {
1459         struct drbd_device *odev = device;
1460         int resync_after;
1461
1462         while (1) {
1463                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1464                         return 1;
1465                 rcu_read_lock();
1466                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1467                 rcu_read_unlock();
1468                 if (resync_after == -1)
1469                         return 1;
1470                 odev = minor_to_device(resync_after);
1471                 if (!odev)
1472                         return 1;
1473                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1474                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1475                     odev->state.aftr_isp || odev->state.peer_isp ||
1476                     odev->state.user_isp)
1477                         return 0;
1478         }
1479 }
1480
1481 /**
1482  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1483  * @device:     DRBD device.
1484  *
1485  * Called from process context only (admin command and after_state_ch).
1486  */
1487 static int _drbd_pause_after(struct drbd_device *device)
1488 {
1489         struct drbd_device *odev;
1490         int i, rv = 0;
1491
1492         rcu_read_lock();
1493         idr_for_each_entry(&drbd_devices, odev, i) {
1494                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1495                         continue;
1496                 if (!_drbd_may_sync_now(odev))
1497                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1498                                != SS_NOTHING_TO_DO);
1499         }
1500         rcu_read_unlock();
1501
1502         return rv;
1503 }
1504
1505 /**
1506  * _drbd_resume_next() - Resume resync on all devices that may resync now
1507  * @device:     DRBD device.
1508  *
1509  * Called from process context only (admin command and worker).
1510  */
1511 static int _drbd_resume_next(struct drbd_device *device)
1512 {
1513         struct drbd_device *odev;
1514         int i, rv = 0;
1515
1516         rcu_read_lock();
1517         idr_for_each_entry(&drbd_devices, odev, i) {
1518                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1519                         continue;
1520                 if (odev->state.aftr_isp) {
1521                         if (_drbd_may_sync_now(odev))
1522                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1523                                                         CS_HARD, NULL)
1524                                        != SS_NOTHING_TO_DO) ;
1525                 }
1526         }
1527         rcu_read_unlock();
1528         return rv;
1529 }
1530
1531 void resume_next_sg(struct drbd_device *device)
1532 {
1533         write_lock_irq(&global_state_lock);
1534         _drbd_resume_next(device);
1535         write_unlock_irq(&global_state_lock);
1536 }
1537
1538 void suspend_other_sg(struct drbd_device *device)
1539 {
1540         write_lock_irq(&global_state_lock);
1541         _drbd_pause_after(device);
1542         write_unlock_irq(&global_state_lock);
1543 }
1544
1545 /* caller must hold global_state_lock */
1546 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1547 {
1548         struct drbd_device *odev;
1549         int resync_after;
1550
1551         if (o_minor == -1)
1552                 return NO_ERROR;
1553         if (o_minor < -1 || o_minor > MINORMASK)
1554                 return ERR_RESYNC_AFTER;
1555
1556         /* check for loops */
1557         odev = minor_to_device(o_minor);
1558         while (1) {
1559                 if (odev == device)
1560                         return ERR_RESYNC_AFTER_CYCLE;
1561
1562                 /* You are free to depend on diskless, non-existing,
1563                  * or not yet/no longer existing minors.
1564                  * We only reject dependency loops.
1565                  * We cannot follow the dependency chain beyond a detached or
1566                  * missing minor.
1567                  */
1568                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1569                         return NO_ERROR;
1570
1571                 rcu_read_lock();
1572                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1573                 rcu_read_unlock();
1574                 /* dependency chain ends here, no cycles. */
1575                 if (resync_after == -1)
1576                         return NO_ERROR;
1577
1578                 /* follow the dependency chain */
1579                 odev = minor_to_device(resync_after);
1580         }
1581 }
1582
1583 /* caller must hold global_state_lock */
1584 void drbd_resync_after_changed(struct drbd_device *device)
1585 {
1586         int changes;
1587
1588         do {
1589                 changes  = _drbd_pause_after(device);
1590                 changes |= _drbd_resume_next(device);
1591         } while (changes);
1592 }
1593
1594 void drbd_rs_controller_reset(struct drbd_device *device)
1595 {
1596         struct fifo_buffer *plan;
1597
1598         atomic_set(&device->rs_sect_in, 0);
1599         atomic_set(&device->rs_sect_ev, 0);
1600         device->rs_in_flight = 0;
1601
1602         /* Updating the RCU protected object in place is necessary since
1603            this function gets called from atomic context.
1604            It is valid since all other updates also lead to an completely
1605            empty fifo */
1606         rcu_read_lock();
1607         plan = rcu_dereference(device->rs_plan_s);
1608         plan->total = 0;
1609         fifo_set(plan, 0);
1610         rcu_read_unlock();
1611 }
1612
1613 void start_resync_timer_fn(unsigned long data)
1614 {
1615         struct drbd_device *device = (struct drbd_device *) data;
1616         drbd_device_post_work(device, RS_START);
1617 }
1618
1619 static void do_start_resync(struct drbd_device *device)
1620 {
1621         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1622                 drbd_warn(device, "postponing start_resync ...\n");
1623                 device->start_resync_timer.expires = jiffies + HZ/10;
1624                 add_timer(&device->start_resync_timer);
1625                 return;
1626         }
1627
1628         drbd_start_resync(device, C_SYNC_SOURCE);
1629         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1630 }
1631
1632 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1633 {
1634         bool csums_after_crash_only;
1635         rcu_read_lock();
1636         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1637         rcu_read_unlock();
1638         return connection->agreed_pro_version >= 89 &&          /* supported? */
1639                 connection->csums_tfm &&                        /* configured? */
1640                 (csums_after_crash_only == 0                    /* use for each resync? */
1641                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1642 }
1643
1644 /**
1645  * drbd_start_resync() - Start the resync process
1646  * @device:     DRBD device.
1647  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1648  *
1649  * This function might bring you directly into one of the
1650  * C_PAUSED_SYNC_* states.
1651  */
1652 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1653 {
1654         struct drbd_peer_device *peer_device = first_peer_device(device);
1655         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1656         union drbd_state ns;
1657         int r;
1658
1659         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1660                 drbd_err(device, "Resync already running!\n");
1661                 return;
1662         }
1663
1664         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1665                 if (side == C_SYNC_TARGET) {
1666                         /* Since application IO was locked out during C_WF_BITMAP_T and
1667                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1668                            we check that we might make the data inconsistent. */
1669                         r = drbd_khelper(device, "before-resync-target");
1670                         r = (r >> 8) & 0xff;
1671                         if (r > 0) {
1672                                 drbd_info(device, "before-resync-target handler returned %d, "
1673                                          "dropping connection.\n", r);
1674                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1675                                 return;
1676                         }
1677                 } else /* C_SYNC_SOURCE */ {
1678                         r = drbd_khelper(device, "before-resync-source");
1679                         r = (r >> 8) & 0xff;
1680                         if (r > 0) {
1681                                 if (r == 3) {
1682                                         drbd_info(device, "before-resync-source handler returned %d, "
1683                                                  "ignoring. Old userland tools?", r);
1684                                 } else {
1685                                         drbd_info(device, "before-resync-source handler returned %d, "
1686                                                  "dropping connection.\n", r);
1687                                         conn_request_state(connection,
1688                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1689                                         return;
1690                                 }
1691                         }
1692                 }
1693         }
1694
1695         if (current == connection->worker.task) {
1696                 /* The worker should not sleep waiting for state_mutex,
1697                    that can take long */
1698                 if (!mutex_trylock(device->state_mutex)) {
1699                         set_bit(B_RS_H_DONE, &device->flags);
1700                         device->start_resync_timer.expires = jiffies + HZ/5;
1701                         add_timer(&device->start_resync_timer);
1702                         return;
1703                 }
1704         } else {
1705                 mutex_lock(device->state_mutex);
1706         }
1707         clear_bit(B_RS_H_DONE, &device->flags);
1708
1709         /* req_lock: serialize with drbd_send_and_submit() and others
1710          * global_state_lock: for stable sync-after dependencies */
1711         spin_lock_irq(&device->resource->req_lock);
1712         write_lock(&global_state_lock);
1713         /* Did some connection breakage or IO error race with us? */
1714         if (device->state.conn < C_CONNECTED
1715         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1716                 write_unlock(&global_state_lock);
1717                 spin_unlock_irq(&device->resource->req_lock);
1718                 mutex_unlock(device->state_mutex);
1719                 return;
1720         }
1721
1722         ns = drbd_read_state(device);
1723
1724         ns.aftr_isp = !_drbd_may_sync_now(device);
1725
1726         ns.conn = side;
1727
1728         if (side == C_SYNC_TARGET)
1729                 ns.disk = D_INCONSISTENT;
1730         else /* side == C_SYNC_SOURCE */
1731                 ns.pdsk = D_INCONSISTENT;
1732
1733         r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1734         ns = drbd_read_state(device);
1735
1736         if (ns.conn < C_CONNECTED)
1737                 r = SS_UNKNOWN_ERROR;
1738
1739         if (r == SS_SUCCESS) {
1740                 unsigned long tw = drbd_bm_total_weight(device);
1741                 unsigned long now = jiffies;
1742                 int i;
1743
1744                 device->rs_failed    = 0;
1745                 device->rs_paused    = 0;
1746                 device->rs_same_csum = 0;
1747                 device->rs_last_events = 0;
1748                 device->rs_last_sect_ev = 0;
1749                 device->rs_total     = tw;
1750                 device->rs_start     = now;
1751                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1752                         device->rs_mark_left[i] = tw;
1753                         device->rs_mark_time[i] = now;
1754                 }
1755                 _drbd_pause_after(device);
1756                 /* Forget potentially stale cached per resync extent bit-counts.
1757                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1758                  * disabled, and know the disk state is ok. */
1759                 spin_lock(&device->al_lock);
1760                 lc_reset(device->resync);
1761                 device->resync_locked = 0;
1762                 device->resync_wenr = LC_FREE;
1763                 spin_unlock(&device->al_lock);
1764         }
1765         write_unlock(&global_state_lock);
1766         spin_unlock_irq(&device->resource->req_lock);
1767
1768         if (r == SS_SUCCESS) {
1769                 wake_up(&device->al_wait); /* for lc_reset() above */
1770                 /* reset rs_last_bcast when a resync or verify is started,
1771                  * to deal with potential jiffies wrap. */
1772                 device->rs_last_bcast = jiffies - HZ;
1773
1774                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1775                      drbd_conn_str(ns.conn),
1776                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1777                      (unsigned long) device->rs_total);
1778                 if (side == C_SYNC_TARGET) {
1779                         device->bm_resync_fo = 0;
1780                         device->use_csums = use_checksum_based_resync(connection, device);
1781                 } else {
1782                         device->use_csums = 0;
1783                 }
1784
1785                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1786                  * with w_send_oos, or the sync target will get confused as to
1787                  * how much bits to resync.  We cannot do that always, because for an
1788                  * empty resync and protocol < 95, we need to do it here, as we call
1789                  * drbd_resync_finished from here in that case.
1790                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1791                  * and from after_state_ch otherwise. */
1792                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1793                         drbd_gen_and_send_sync_uuid(peer_device);
1794
1795                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1796                         /* This still has a race (about when exactly the peers
1797                          * detect connection loss) that can lead to a full sync
1798                          * on next handshake. In 8.3.9 we fixed this with explicit
1799                          * resync-finished notifications, but the fix
1800                          * introduces a protocol change.  Sleeping for some
1801                          * time longer than the ping interval + timeout on the
1802                          * SyncSource, to give the SyncTarget the chance to
1803                          * detect connection loss, then waiting for a ping
1804                          * response (implicit in drbd_resync_finished) reduces
1805                          * the race considerably, but does not solve it. */
1806                         if (side == C_SYNC_SOURCE) {
1807                                 struct net_conf *nc;
1808                                 int timeo;
1809
1810                                 rcu_read_lock();
1811                                 nc = rcu_dereference(connection->net_conf);
1812                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1813                                 rcu_read_unlock();
1814                                 schedule_timeout_interruptible(timeo);
1815                         }
1816                         drbd_resync_finished(device);
1817                 }
1818
1819                 drbd_rs_controller_reset(device);
1820                 /* ns.conn may already be != device->state.conn,
1821                  * we may have been paused in between, or become paused until
1822                  * the timer triggers.
1823                  * No matter, that is handled in resync_timer_fn() */
1824                 if (ns.conn == C_SYNC_TARGET)
1825                         mod_timer(&device->resync_timer, jiffies);
1826
1827                 drbd_md_sync(device);
1828         }
1829         put_ldev(device);
1830         mutex_unlock(device->state_mutex);
1831 }
1832
1833 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1834 {
1835         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1836         device->rs_last_bcast = jiffies;
1837
1838         if (!get_ldev(device))
1839                 return;
1840
1841         drbd_bm_write_lazy(device, 0);
1842         if (resync_done && is_sync_state(device->state.conn))
1843                 drbd_resync_finished(device);
1844
1845         drbd_bcast_event(device, &sib);
1846         /* update timestamp, in case it took a while to write out stuff */
1847         device->rs_last_bcast = jiffies;
1848         put_ldev(device);
1849 }
1850
1851 static void drbd_ldev_destroy(struct drbd_device *device)
1852 {
1853         lc_destroy(device->resync);
1854         device->resync = NULL;
1855         lc_destroy(device->act_log);
1856         device->act_log = NULL;
1857         __no_warn(local,
1858                 drbd_free_ldev(device->ldev);
1859                 device->ldev = NULL;);
1860         clear_bit(GOING_DISKLESS, &device->flags);
1861         wake_up(&device->misc_wait);
1862 }
1863
1864 static void go_diskless(struct drbd_device *device)
1865 {
1866         D_ASSERT(device, device->state.disk == D_FAILED);
1867         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1868          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1869          * the protected members anymore, though, so once put_ldev reaches zero
1870          * again, it will be safe to free them. */
1871
1872         /* Try to write changed bitmap pages, read errors may have just
1873          * set some bits outside the area covered by the activity log.
1874          *
1875          * If we have an IO error during the bitmap writeout,
1876          * we will want a full sync next time, just in case.
1877          * (Do we want a specific meta data flag for this?)
1878          *
1879          * If that does not make it to stable storage either,
1880          * we cannot do anything about that anymore.
1881          *
1882          * We still need to check if both bitmap and ldev are present, we may
1883          * end up here after a failed attach, before ldev was even assigned.
1884          */
1885         if (device->bitmap && device->ldev) {
1886                 /* An interrupted resync or similar is allowed to recounts bits
1887                  * while we detach.
1888                  * Any modifications would not be expected anymore, though.
1889                  */
1890                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1891                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1892                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1893                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1894                                 drbd_md_sync(device);
1895                         }
1896                 }
1897         }
1898
1899         drbd_force_state(device, NS(disk, D_DISKLESS));
1900 }
1901
1902 static int do_md_sync(struct drbd_device *device)
1903 {
1904         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1905         drbd_md_sync(device);
1906         return 0;
1907 }
1908
1909 #define WORK_PENDING(work_bit, todo)    (todo & (1UL << work_bit))
1910 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1911 {
1912         if (WORK_PENDING(MD_SYNC, todo))
1913                 do_md_sync(device);
1914         if (WORK_PENDING(RS_DONE, todo) ||
1915             WORK_PENDING(RS_PROGRESS, todo))
1916                 update_on_disk_bitmap(device, WORK_PENDING(RS_DONE, todo));
1917         if (WORK_PENDING(GO_DISKLESS, todo))
1918                 go_diskless(device);
1919         if (WORK_PENDING(DESTROY_DISK, todo))
1920                 drbd_ldev_destroy(device);
1921         if (WORK_PENDING(RS_START, todo))
1922                 do_start_resync(device);
1923 }
1924
1925 #define DRBD_DEVICE_WORK_MASK   \
1926         ((1UL << GO_DISKLESS)   \
1927         |(1UL << DESTROY_DISK)  \
1928         |(1UL << MD_SYNC)       \
1929         |(1UL << RS_START)      \
1930         |(1UL << RS_PROGRESS)   \
1931         |(1UL << RS_DONE)       \
1932         )
1933
1934 static unsigned long get_work_bits(unsigned long *flags)
1935 {
1936         unsigned long old, new;
1937         do {
1938                 old = *flags;
1939                 new = old & ~DRBD_DEVICE_WORK_MASK;
1940         } while (cmpxchg(flags, old, new) != old);
1941         return old & DRBD_DEVICE_WORK_MASK;
1942 }
1943
1944 static void do_unqueued_work(struct drbd_connection *connection)
1945 {
1946         struct drbd_peer_device *peer_device;
1947         int vnr;
1948
1949         rcu_read_lock();
1950         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1951                 struct drbd_device *device = peer_device->device;
1952                 unsigned long todo = get_work_bits(&device->flags);
1953                 if (!todo)
1954                         continue;
1955
1956                 kref_get(&device->kref);
1957                 rcu_read_unlock();
1958                 do_device_work(device, todo);
1959                 kref_put(&device->kref, drbd_destroy_device);
1960                 rcu_read_lock();
1961         }
1962         rcu_read_unlock();
1963 }
1964
1965 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1966 {
1967         spin_lock_irq(&queue->q_lock);
1968         list_splice_tail_init(&queue->q, work_list);
1969         spin_unlock_irq(&queue->q_lock);
1970         return !list_empty(work_list);
1971 }
1972
1973 static bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1974 {
1975         spin_lock_irq(&queue->q_lock);
1976         if (!list_empty(&queue->q))
1977                 list_move(queue->q.next, work_list);
1978         spin_unlock_irq(&queue->q_lock);
1979         return !list_empty(work_list);
1980 }
1981
1982 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1983 {
1984         DEFINE_WAIT(wait);
1985         struct net_conf *nc;
1986         int uncork, cork;
1987
1988         dequeue_work_item(&connection->sender_work, work_list);
1989         if (!list_empty(work_list))
1990                 return;
1991
1992         /* Still nothing to do?
1993          * Maybe we still need to close the current epoch,
1994          * even if no new requests are queued yet.
1995          *
1996          * Also, poke TCP, just in case.
1997          * Then wait for new work (or signal). */
1998         rcu_read_lock();
1999         nc = rcu_dereference(connection->net_conf);
2000         uncork = nc ? nc->tcp_cork : 0;
2001         rcu_read_unlock();
2002         if (uncork) {
2003                 mutex_lock(&connection->data.mutex);
2004                 if (connection->data.socket)
2005                         drbd_tcp_uncork(connection->data.socket);
2006                 mutex_unlock(&connection->data.mutex);
2007         }
2008
2009         for (;;) {
2010                 int send_barrier;
2011                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2012                 spin_lock_irq(&connection->resource->req_lock);
2013                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2014                 /* dequeue single item only,
2015                  * we still use drbd_queue_work_front() in some places */
2016                 if (!list_empty(&connection->sender_work.q))
2017                         list_splice_tail_init(&connection->sender_work.q, work_list);
2018                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2019                 if (!list_empty(work_list) || signal_pending(current)) {
2020                         spin_unlock_irq(&connection->resource->req_lock);
2021                         break;
2022                 }
2023
2024                 /* We found nothing new to do, no to-be-communicated request,
2025                  * no other work item.  We may still need to close the last
2026                  * epoch.  Next incoming request epoch will be connection ->
2027                  * current transfer log epoch number.  If that is different
2028                  * from the epoch of the last request we communicated, it is
2029                  * safe to send the epoch separating barrier now.
2030                  */
2031                 send_barrier =
2032                         atomic_read(&connection->current_tle_nr) !=
2033                         connection->send.current_epoch_nr;
2034                 spin_unlock_irq(&connection->resource->req_lock);
2035
2036                 if (send_barrier)
2037                         maybe_send_barrier(connection,
2038                                         connection->send.current_epoch_nr + 1);
2039
2040                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2041                         break;
2042
2043                 /* drbd_send() may have called flush_signals() */
2044                 if (get_t_state(&connection->worker) != RUNNING)
2045                         break;
2046
2047                 schedule();
2048                 /* may be woken up for other things but new work, too,
2049                  * e.g. if the current epoch got closed.
2050                  * In which case we send the barrier above. */
2051         }
2052         finish_wait(&connection->sender_work.q_wait, &wait);
2053
2054         /* someone may have changed the config while we have been waiting above. */
2055         rcu_read_lock();
2056         nc = rcu_dereference(connection->net_conf);
2057         cork = nc ? nc->tcp_cork : 0;
2058         rcu_read_unlock();
2059         mutex_lock(&connection->data.mutex);
2060         if (connection->data.socket) {
2061                 if (cork)
2062                         drbd_tcp_cork(connection->data.socket);
2063                 else if (!uncork)
2064                         drbd_tcp_uncork(connection->data.socket);
2065         }
2066         mutex_unlock(&connection->data.mutex);
2067 }
2068
2069 int drbd_worker(struct drbd_thread *thi)
2070 {
2071         struct drbd_connection *connection = thi->connection;
2072         struct drbd_work *w = NULL;
2073         struct drbd_peer_device *peer_device;
2074         LIST_HEAD(work_list);
2075         int vnr;
2076
2077         while (get_t_state(thi) == RUNNING) {
2078                 drbd_thread_current_set_cpu(thi);
2079
2080                 if (list_empty(&work_list))
2081                         wait_for_work(connection, &work_list);
2082
2083                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags))
2084                         do_unqueued_work(connection);
2085
2086                 if (signal_pending(current)) {
2087                         flush_signals(current);
2088                         if (get_t_state(thi) == RUNNING) {
2089                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2090                                 continue;
2091                         }
2092                         break;
2093                 }
2094
2095                 if (get_t_state(thi) != RUNNING)
2096                         break;
2097
2098                 while (!list_empty(&work_list)) {
2099                         w = list_first_entry(&work_list, struct drbd_work, list);
2100                         list_del_init(&w->list);
2101                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2102                                 continue;
2103                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2104                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2105                 }
2106         }
2107
2108         do {
2109                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags))
2110                         do_unqueued_work(connection);
2111                 while (!list_empty(&work_list)) {
2112                         w = list_first_entry(&work_list, struct drbd_work, list);
2113                         list_del_init(&w->list);
2114                         w->cb(w, 1);
2115                 }
2116                 dequeue_work_batch(&connection->sender_work, &work_list);
2117         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2118
2119         rcu_read_lock();
2120         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2121                 struct drbd_device *device = peer_device->device;
2122                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2123                 kref_get(&device->kref);
2124                 rcu_read_unlock();
2125                 drbd_device_cleanup(device);
2126                 kref_put(&device->kref, drbd_destroy_device);
2127                 rcu_read_lock();
2128         }
2129         rcu_read_unlock();
2130
2131         return 0;
2132 }