HID: picolcd: sanity check report size in raw_event() callback
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44
45 /* endio handlers:
46  *   drbd_md_io_complete (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   bm_async_io_complete (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57
58
59 /* About the global_state_lock
60    Each state transition on an device holds a read lock. In case we have
61    to evaluate the resync after dependencies, we grab a write lock, because
62    we need stable states on all devices for that.  */
63 rwlock_t global_state_lock;
64
65 /* used for synchronous meta data and bitmap IO
66  * submitted by drbd_md_sync_page_io()
67  */
68 void drbd_md_io_complete(struct bio *bio, int error)
69 {
70         struct drbd_md_io *md_io;
71         struct drbd_device *device;
72
73         md_io = (struct drbd_md_io *)bio->bi_private;
74         device = container_of(md_io, struct drbd_device, md_io);
75
76         md_io->error = error;
77
78         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
79          * to timeout on the lower level device, and eventually detach from it.
80          * If this io completion runs after that timeout expired, this
81          * drbd_md_put_buffer() may allow us to finally try and re-attach.
82          * During normal operation, this only puts that extra reference
83          * down to 1 again.
84          * Make sure we first drop the reference, and only then signal
85          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
86          * next drbd_md_sync_page_io(), that we trigger the
87          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
88          */
89         drbd_md_put_buffer(device);
90         md_io->done = 1;
91         wake_up(&device->misc_wait);
92         bio_put(bio);
93         if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
94                 put_ldev(device);
95 }
96
97 /* reads on behalf of the partner,
98  * "submitted" by the receiver
99  */
100 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
101 {
102         unsigned long flags = 0;
103         struct drbd_peer_device *peer_device = peer_req->peer_device;
104         struct drbd_device *device = peer_device->device;
105
106         spin_lock_irqsave(&device->resource->req_lock, flags);
107         device->read_cnt += peer_req->i.size >> 9;
108         list_del(&peer_req->w.list);
109         if (list_empty(&device->read_ee))
110                 wake_up(&device->ee_wait);
111         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
112                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
113         spin_unlock_irqrestore(&device->resource->req_lock, flags);
114
115         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
116         put_ldev(device);
117 }
118
119 /* writes on behalf of the partner, or resync writes,
120  * "submitted" by the receiver, final stage.  */
121 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
122 {
123         unsigned long flags = 0;
124         struct drbd_peer_device *peer_device = peer_req->peer_device;
125         struct drbd_device *device = peer_device->device;
126         struct drbd_interval i;
127         int do_wake;
128         u64 block_id;
129         int do_al_complete_io;
130
131         /* after we moved peer_req to done_ee,
132          * we may no longer access it,
133          * it may be freed/reused already!
134          * (as soon as we release the req_lock) */
135         i = peer_req->i;
136         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
137         block_id = peer_req->block_id;
138
139         spin_lock_irqsave(&device->resource->req_lock, flags);
140         device->writ_cnt += peer_req->i.size >> 9;
141         list_move_tail(&peer_req->w.list, &device->done_ee);
142
143         /*
144          * Do not remove from the write_requests tree here: we did not send the
145          * Ack yet and did not wake possibly waiting conflicting requests.
146          * Removed from the tree from "drbd_process_done_ee" within the
147          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
148          * _drbd_clear_done_ee.
149          */
150
151         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
152
153         /* FIXME do we want to detach for failed REQ_DISCARD?
154          * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
155         if (peer_req->flags & EE_WAS_ERROR)
156                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
157         spin_unlock_irqrestore(&device->resource->req_lock, flags);
158
159         if (block_id == ID_SYNCER)
160                 drbd_rs_complete_io(device, i.sector);
161
162         if (do_wake)
163                 wake_up(&device->ee_wait);
164
165         if (do_al_complete_io)
166                 drbd_al_complete_io(device, &i);
167
168         wake_asender(peer_device->connection);
169         put_ldev(device);
170 }
171
172 /* writes on behalf of the partner, or resync writes,
173  * "submitted" by the receiver.
174  */
175 void drbd_peer_request_endio(struct bio *bio, int error)
176 {
177         struct drbd_peer_request *peer_req = bio->bi_private;
178         struct drbd_device *device = peer_req->peer_device->device;
179         int uptodate = bio_flagged(bio, BIO_UPTODATE);
180         int is_write = bio_data_dir(bio) == WRITE;
181         int is_discard = !!(bio->bi_rw & REQ_DISCARD);
182
183         if (error && __ratelimit(&drbd_ratelimit_state))
184                 drbd_warn(device, "%s: error=%d s=%llus\n",
185                                 is_write ? (is_discard ? "discard" : "write")
186                                         : "read", error,
187                                 (unsigned long long)peer_req->i.sector);
188         if (!error && !uptodate) {
189                 if (__ratelimit(&drbd_ratelimit_state))
190                         drbd_warn(device, "%s: setting error to -EIO s=%llus\n",
191                                         is_write ? "write" : "read",
192                                         (unsigned long long)peer_req->i.sector);
193                 /* strange behavior of some lower level drivers...
194                  * fail the request by clearing the uptodate flag,
195                  * but do not return any error?! */
196                 error = -EIO;
197         }
198
199         if (error)
200                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
201
202         bio_put(bio); /* no need for the bio anymore */
203         if (atomic_dec_and_test(&peer_req->pending_bios)) {
204                 if (is_write)
205                         drbd_endio_write_sec_final(peer_req);
206                 else
207                         drbd_endio_read_sec_final(peer_req);
208         }
209 }
210
211 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
212  */
213 void drbd_request_endio(struct bio *bio, int error)
214 {
215         unsigned long flags;
216         struct drbd_request *req = bio->bi_private;
217         struct drbd_device *device = req->device;
218         struct bio_and_error m;
219         enum drbd_req_event what;
220         int uptodate = bio_flagged(bio, BIO_UPTODATE);
221
222         if (!error && !uptodate) {
223                 drbd_warn(device, "p %s: setting error to -EIO\n",
224                          bio_data_dir(bio) == WRITE ? "write" : "read");
225                 /* strange behavior of some lower level drivers...
226                  * fail the request by clearing the uptodate flag,
227                  * but do not return any error?! */
228                 error = -EIO;
229         }
230
231
232         /* If this request was aborted locally before,
233          * but now was completed "successfully",
234          * chances are that this caused arbitrary data corruption.
235          *
236          * "aborting" requests, or force-detaching the disk, is intended for
237          * completely blocked/hung local backing devices which do no longer
238          * complete requests at all, not even do error completions.  In this
239          * situation, usually a hard-reset and failover is the only way out.
240          *
241          * By "aborting", basically faking a local error-completion,
242          * we allow for a more graceful swichover by cleanly migrating services.
243          * Still the affected node has to be rebooted "soon".
244          *
245          * By completing these requests, we allow the upper layers to re-use
246          * the associated data pages.
247          *
248          * If later the local backing device "recovers", and now DMAs some data
249          * from disk into the original request pages, in the best case it will
250          * just put random data into unused pages; but typically it will corrupt
251          * meanwhile completely unrelated data, causing all sorts of damage.
252          *
253          * Which means delayed successful completion,
254          * especially for READ requests,
255          * is a reason to panic().
256          *
257          * We assume that a delayed *error* completion is OK,
258          * though we still will complain noisily about it.
259          */
260         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
261                 if (__ratelimit(&drbd_ratelimit_state))
262                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
263
264                 if (!error)
265                         panic("possible random memory corruption caused by delayed completion of aborted local request\n");
266         }
267
268         /* to avoid recursion in __req_mod */
269         if (unlikely(error)) {
270                 if (bio->bi_rw & REQ_DISCARD)
271                         what = (error == -EOPNOTSUPP)
272                                 ? DISCARD_COMPLETED_NOTSUPP
273                                 : DISCARD_COMPLETED_WITH_ERROR;
274                 else
275                         what = (bio_data_dir(bio) == WRITE)
276                         ? WRITE_COMPLETED_WITH_ERROR
277                         : (bio_rw(bio) == READ)
278                           ? READ_COMPLETED_WITH_ERROR
279                           : READ_AHEAD_COMPLETED_WITH_ERROR;
280         } else
281                 what = COMPLETED_OK;
282
283         bio_put(req->private_bio);
284         req->private_bio = ERR_PTR(error);
285
286         /* not req_mod(), we need irqsave here! */
287         spin_lock_irqsave(&device->resource->req_lock, flags);
288         __req_mod(req, what, &m);
289         spin_unlock_irqrestore(&device->resource->req_lock, flags);
290         put_ldev(device);
291
292         if (m.bio)
293                 complete_master_bio(device, &m);
294 }
295
296 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
297 {
298         struct hash_desc desc;
299         struct scatterlist sg;
300         struct page *page = peer_req->pages;
301         struct page *tmp;
302         unsigned len;
303
304         desc.tfm = tfm;
305         desc.flags = 0;
306
307         sg_init_table(&sg, 1);
308         crypto_hash_init(&desc);
309
310         while ((tmp = page_chain_next(page))) {
311                 /* all but the last page will be fully used */
312                 sg_set_page(&sg, page, PAGE_SIZE, 0);
313                 crypto_hash_update(&desc, &sg, sg.length);
314                 page = tmp;
315         }
316         /* and now the last, possibly only partially used page */
317         len = peer_req->i.size & (PAGE_SIZE - 1);
318         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
319         crypto_hash_update(&desc, &sg, sg.length);
320         crypto_hash_final(&desc, digest);
321 }
322
323 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
324 {
325         struct hash_desc desc;
326         struct scatterlist sg;
327         struct bio_vec bvec;
328         struct bvec_iter iter;
329
330         desc.tfm = tfm;
331         desc.flags = 0;
332
333         sg_init_table(&sg, 1);
334         crypto_hash_init(&desc);
335
336         bio_for_each_segment(bvec, bio, iter) {
337                 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
338                 crypto_hash_update(&desc, &sg, sg.length);
339         }
340         crypto_hash_final(&desc, digest);
341 }
342
343 /* MAYBE merge common code with w_e_end_ov_req */
344 static int w_e_send_csum(struct drbd_work *w, int cancel)
345 {
346         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
347         struct drbd_peer_device *peer_device = peer_req->peer_device;
348         struct drbd_device *device = peer_device->device;
349         int digest_size;
350         void *digest;
351         int err = 0;
352
353         if (unlikely(cancel))
354                 goto out;
355
356         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
357                 goto out;
358
359         digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
360         digest = kmalloc(digest_size, GFP_NOIO);
361         if (digest) {
362                 sector_t sector = peer_req->i.sector;
363                 unsigned int size = peer_req->i.size;
364                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
365                 /* Free peer_req and pages before send.
366                  * In case we block on congestion, we could otherwise run into
367                  * some distributed deadlock, if the other side blocks on
368                  * congestion as well, because our receiver blocks in
369                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
370                 drbd_free_peer_req(device, peer_req);
371                 peer_req = NULL;
372                 inc_rs_pending(device);
373                 err = drbd_send_drequest_csum(peer_device, sector, size,
374                                               digest, digest_size,
375                                               P_CSUM_RS_REQUEST);
376                 kfree(digest);
377         } else {
378                 drbd_err(device, "kmalloc() of digest failed.\n");
379                 err = -ENOMEM;
380         }
381
382 out:
383         if (peer_req)
384                 drbd_free_peer_req(device, peer_req);
385
386         if (unlikely(err))
387                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
388         return err;
389 }
390
391 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
392
393 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
394 {
395         struct drbd_device *device = peer_device->device;
396         struct drbd_peer_request *peer_req;
397
398         if (!get_ldev(device))
399                 return -EIO;
400
401         if (drbd_rs_should_slow_down(device, sector))
402                 goto defer;
403
404         /* GFP_TRY, because if there is no memory available right now, this may
405          * be rescheduled for later. It is "only" background resync, after all. */
406         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
407                                        size, true /* has real payload */, GFP_TRY);
408         if (!peer_req)
409                 goto defer;
410
411         peer_req->w.cb = w_e_send_csum;
412         spin_lock_irq(&device->resource->req_lock);
413         list_add(&peer_req->w.list, &device->read_ee);
414         spin_unlock_irq(&device->resource->req_lock);
415
416         atomic_add(size >> 9, &device->rs_sect_ev);
417         if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
418                 return 0;
419
420         /* If it failed because of ENOMEM, retry should help.  If it failed
421          * because bio_add_page failed (probably broken lower level driver),
422          * retry may or may not help.
423          * If it does not, you may need to force disconnect. */
424         spin_lock_irq(&device->resource->req_lock);
425         list_del(&peer_req->w.list);
426         spin_unlock_irq(&device->resource->req_lock);
427
428         drbd_free_peer_req(device, peer_req);
429 defer:
430         put_ldev(device);
431         return -EAGAIN;
432 }
433
434 int w_resync_timer(struct drbd_work *w, int cancel)
435 {
436         struct drbd_device *device =
437                 container_of(w, struct drbd_device, resync_work);
438
439         switch (device->state.conn) {
440         case C_VERIFY_S:
441                 make_ov_request(device, cancel);
442                 break;
443         case C_SYNC_TARGET:
444                 make_resync_request(device, cancel);
445                 break;
446         }
447
448         return 0;
449 }
450
451 void resync_timer_fn(unsigned long data)
452 {
453         struct drbd_device *device = (struct drbd_device *) data;
454
455         if (list_empty(&device->resync_work.list))
456                 drbd_queue_work(&first_peer_device(device)->connection->sender_work,
457                                 &device->resync_work);
458 }
459
460 static void fifo_set(struct fifo_buffer *fb, int value)
461 {
462         int i;
463
464         for (i = 0; i < fb->size; i++)
465                 fb->values[i] = value;
466 }
467
468 static int fifo_push(struct fifo_buffer *fb, int value)
469 {
470         int ov;
471
472         ov = fb->values[fb->head_index];
473         fb->values[fb->head_index++] = value;
474
475         if (fb->head_index >= fb->size)
476                 fb->head_index = 0;
477
478         return ov;
479 }
480
481 static void fifo_add_val(struct fifo_buffer *fb, int value)
482 {
483         int i;
484
485         for (i = 0; i < fb->size; i++)
486                 fb->values[i] += value;
487 }
488
489 struct fifo_buffer *fifo_alloc(int fifo_size)
490 {
491         struct fifo_buffer *fb;
492
493         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
494         if (!fb)
495                 return NULL;
496
497         fb->head_index = 0;
498         fb->size = fifo_size;
499         fb->total = 0;
500
501         return fb;
502 }
503
504 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
505 {
506         struct disk_conf *dc;
507         unsigned int want;     /* The number of sectors we want in the proxy */
508         int req_sect; /* Number of sectors to request in this turn */
509         int correction; /* Number of sectors more we need in the proxy*/
510         int cps; /* correction per invocation of drbd_rs_controller() */
511         int steps; /* Number of time steps to plan ahead */
512         int curr_corr;
513         int max_sect;
514         struct fifo_buffer *plan;
515
516         dc = rcu_dereference(device->ldev->disk_conf);
517         plan = rcu_dereference(device->rs_plan_s);
518
519         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
520
521         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
522                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
523         } else { /* normal path */
524                 want = dc->c_fill_target ? dc->c_fill_target :
525                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
526         }
527
528         correction = want - device->rs_in_flight - plan->total;
529
530         /* Plan ahead */
531         cps = correction / steps;
532         fifo_add_val(plan, cps);
533         plan->total += cps * steps;
534
535         /* What we do in this step */
536         curr_corr = fifo_push(plan, 0);
537         plan->total -= curr_corr;
538
539         req_sect = sect_in + curr_corr;
540         if (req_sect < 0)
541                 req_sect = 0;
542
543         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
544         if (req_sect > max_sect)
545                 req_sect = max_sect;
546
547         /*
548         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
549                  sect_in, device->rs_in_flight, want, correction,
550                  steps, cps, device->rs_planed, curr_corr, req_sect);
551         */
552
553         return req_sect;
554 }
555
556 static int drbd_rs_number_requests(struct drbd_device *device)
557 {
558         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
559         int number, mxb;
560
561         sect_in = atomic_xchg(&device->rs_sect_in, 0);
562         device->rs_in_flight -= sect_in;
563
564         rcu_read_lock();
565         mxb = drbd_get_max_buffers(device) / 2;
566         if (rcu_dereference(device->rs_plan_s)->size) {
567                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
568                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
569         } else {
570                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
571                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
572         }
573         rcu_read_unlock();
574
575         /* Don't have more than "max-buffers"/2 in-flight.
576          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
577          * potentially causing a distributed deadlock on congestion during
578          * online-verify or (checksum-based) resync, if max-buffers,
579          * socket buffer sizes and resync rate settings are mis-configured. */
580         if (mxb - device->rs_in_flight < number)
581                 number = mxb - device->rs_in_flight;
582
583         return number;
584 }
585
586 static int make_resync_request(struct drbd_device *device, int cancel)
587 {
588         unsigned long bit;
589         sector_t sector;
590         const sector_t capacity = drbd_get_capacity(device->this_bdev);
591         int max_bio_size;
592         int number, rollback_i, size;
593         int align, queued, sndbuf;
594         int i = 0;
595
596         if (unlikely(cancel))
597                 return 0;
598
599         if (device->rs_total == 0) {
600                 /* empty resync? */
601                 drbd_resync_finished(device);
602                 return 0;
603         }
604
605         if (!get_ldev(device)) {
606                 /* Since we only need to access device->rsync a
607                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
608                    to continue resync with a broken disk makes no sense at
609                    all */
610                 drbd_err(device, "Disk broke down during resync!\n");
611                 return 0;
612         }
613
614         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
615         number = drbd_rs_number_requests(device);
616         if (number <= 0)
617                 goto requeue;
618
619         for (i = 0; i < number; i++) {
620                 /* Stop generating RS requests, when half of the send buffer is filled */
621                 mutex_lock(&first_peer_device(device)->connection->data.mutex);
622                 if (first_peer_device(device)->connection->data.socket) {
623                         queued = first_peer_device(device)->connection->data.socket->sk->sk_wmem_queued;
624                         sndbuf = first_peer_device(device)->connection->data.socket->sk->sk_sndbuf;
625                 } else {
626                         queued = 1;
627                         sndbuf = 0;
628                 }
629                 mutex_unlock(&first_peer_device(device)->connection->data.mutex);
630                 if (queued > sndbuf / 2)
631                         goto requeue;
632
633 next_sector:
634                 size = BM_BLOCK_SIZE;
635                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
636
637                 if (bit == DRBD_END_OF_BITMAP) {
638                         device->bm_resync_fo = drbd_bm_bits(device);
639                         put_ldev(device);
640                         return 0;
641                 }
642
643                 sector = BM_BIT_TO_SECT(bit);
644
645                 if (drbd_rs_should_slow_down(device, sector) ||
646                     drbd_try_rs_begin_io(device, sector)) {
647                         device->bm_resync_fo = bit;
648                         goto requeue;
649                 }
650                 device->bm_resync_fo = bit + 1;
651
652                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
653                         drbd_rs_complete_io(device, sector);
654                         goto next_sector;
655                 }
656
657 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
658                 /* try to find some adjacent bits.
659                  * we stop if we have already the maximum req size.
660                  *
661                  * Additionally always align bigger requests, in order to
662                  * be prepared for all stripe sizes of software RAIDs.
663                  */
664                 align = 1;
665                 rollback_i = i;
666                 while (i < number) {
667                         if (size + BM_BLOCK_SIZE > max_bio_size)
668                                 break;
669
670                         /* Be always aligned */
671                         if (sector & ((1<<(align+3))-1))
672                                 break;
673
674                         /* do not cross extent boundaries */
675                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
676                                 break;
677                         /* now, is it actually dirty, after all?
678                          * caution, drbd_bm_test_bit is tri-state for some
679                          * obscure reason; ( b == 0 ) would get the out-of-band
680                          * only accidentally right because of the "oddly sized"
681                          * adjustment below */
682                         if (drbd_bm_test_bit(device, bit+1) != 1)
683                                 break;
684                         bit++;
685                         size += BM_BLOCK_SIZE;
686                         if ((BM_BLOCK_SIZE << align) <= size)
687                                 align++;
688                         i++;
689                 }
690                 /* if we merged some,
691                  * reset the offset to start the next drbd_bm_find_next from */
692                 if (size > BM_BLOCK_SIZE)
693                         device->bm_resync_fo = bit + 1;
694 #endif
695
696                 /* adjust very last sectors, in case we are oddly sized */
697                 if (sector + (size>>9) > capacity)
698                         size = (capacity-sector)<<9;
699                 if (first_peer_device(device)->connection->agreed_pro_version >= 89 &&
700                     first_peer_device(device)->connection->csums_tfm) {
701                         switch (read_for_csum(first_peer_device(device), sector, size)) {
702                         case -EIO: /* Disk failure */
703                                 put_ldev(device);
704                                 return -EIO;
705                         case -EAGAIN: /* allocation failed, or ldev busy */
706                                 drbd_rs_complete_io(device, sector);
707                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
708                                 i = rollback_i;
709                                 goto requeue;
710                         case 0:
711                                 /* everything ok */
712                                 break;
713                         default:
714                                 BUG();
715                         }
716                 } else {
717                         int err;
718
719                         inc_rs_pending(device);
720                         err = drbd_send_drequest(first_peer_device(device), P_RS_DATA_REQUEST,
721                                                  sector, size, ID_SYNCER);
722                         if (err) {
723                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
724                                 dec_rs_pending(device);
725                                 put_ldev(device);
726                                 return err;
727                         }
728                 }
729         }
730
731         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
732                 /* last syncer _request_ was sent,
733                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
734                  * next sync group will resume), as soon as we receive the last
735                  * resync data block, and the last bit is cleared.
736                  * until then resync "work" is "inactive" ...
737                  */
738                 put_ldev(device);
739                 return 0;
740         }
741
742  requeue:
743         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
744         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
745         put_ldev(device);
746         return 0;
747 }
748
749 static int make_ov_request(struct drbd_device *device, int cancel)
750 {
751         int number, i, size;
752         sector_t sector;
753         const sector_t capacity = drbd_get_capacity(device->this_bdev);
754         bool stop_sector_reached = false;
755
756         if (unlikely(cancel))
757                 return 1;
758
759         number = drbd_rs_number_requests(device);
760
761         sector = device->ov_position;
762         for (i = 0; i < number; i++) {
763                 if (sector >= capacity)
764                         return 1;
765
766                 /* We check for "finished" only in the reply path:
767                  * w_e_end_ov_reply().
768                  * We need to send at least one request out. */
769                 stop_sector_reached = i > 0
770                         && verify_can_do_stop_sector(device)
771                         && sector >= device->ov_stop_sector;
772                 if (stop_sector_reached)
773                         break;
774
775                 size = BM_BLOCK_SIZE;
776
777                 if (drbd_rs_should_slow_down(device, sector) ||
778                     drbd_try_rs_begin_io(device, sector)) {
779                         device->ov_position = sector;
780                         goto requeue;
781                 }
782
783                 if (sector + (size>>9) > capacity)
784                         size = (capacity-sector)<<9;
785
786                 inc_rs_pending(device);
787                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
788                         dec_rs_pending(device);
789                         return 0;
790                 }
791                 sector += BM_SECT_PER_BIT;
792         }
793         device->ov_position = sector;
794
795  requeue:
796         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
797         if (i == 0 || !stop_sector_reached)
798                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
799         return 1;
800 }
801
802 int w_ov_finished(struct drbd_work *w, int cancel)
803 {
804         struct drbd_device_work *dw =
805                 container_of(w, struct drbd_device_work, w);
806         struct drbd_device *device = dw->device;
807         kfree(dw);
808         ov_out_of_sync_print(device);
809         drbd_resync_finished(device);
810
811         return 0;
812 }
813
814 static int w_resync_finished(struct drbd_work *w, int cancel)
815 {
816         struct drbd_device_work *dw =
817                 container_of(w, struct drbd_device_work, w);
818         struct drbd_device *device = dw->device;
819         kfree(dw);
820
821         drbd_resync_finished(device);
822
823         return 0;
824 }
825
826 static void ping_peer(struct drbd_device *device)
827 {
828         struct drbd_connection *connection = first_peer_device(device)->connection;
829
830         clear_bit(GOT_PING_ACK, &connection->flags);
831         request_ping(connection);
832         wait_event(connection->ping_wait,
833                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
834 }
835
836 int drbd_resync_finished(struct drbd_device *device)
837 {
838         unsigned long db, dt, dbdt;
839         unsigned long n_oos;
840         union drbd_state os, ns;
841         struct drbd_device_work *dw;
842         char *khelper_cmd = NULL;
843         int verify_done = 0;
844
845         /* Remove all elements from the resync LRU. Since future actions
846          * might set bits in the (main) bitmap, then the entries in the
847          * resync LRU would be wrong. */
848         if (drbd_rs_del_all(device)) {
849                 /* In case this is not possible now, most probably because
850                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
851                  * queue (or even the read operations for those packets
852                  * is not finished by now).   Retry in 100ms. */
853
854                 schedule_timeout_interruptible(HZ / 10);
855                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
856                 if (dw) {
857                         dw->w.cb = w_resync_finished;
858                         dw->device = device;
859                         drbd_queue_work(&first_peer_device(device)->connection->sender_work,
860                                         &dw->w);
861                         return 1;
862                 }
863                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
864         }
865
866         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
867         if (dt <= 0)
868                 dt = 1;
869
870         db = device->rs_total;
871         /* adjust for verify start and stop sectors, respective reached position */
872         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
873                 db -= device->ov_left;
874
875         dbdt = Bit2KB(db/dt);
876         device->rs_paused /= HZ;
877
878         if (!get_ldev(device))
879                 goto out;
880
881         ping_peer(device);
882
883         spin_lock_irq(&device->resource->req_lock);
884         os = drbd_read_state(device);
885
886         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
887
888         /* This protects us against multiple calls (that can happen in the presence
889            of application IO), and against connectivity loss just before we arrive here. */
890         if (os.conn <= C_CONNECTED)
891                 goto out_unlock;
892
893         ns = os;
894         ns.conn = C_CONNECTED;
895
896         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
897              verify_done ? "Online verify" : "Resync",
898              dt + device->rs_paused, device->rs_paused, dbdt);
899
900         n_oos = drbd_bm_total_weight(device);
901
902         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
903                 if (n_oos) {
904                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
905                               n_oos, Bit2KB(1));
906                         khelper_cmd = "out-of-sync";
907                 }
908         } else {
909                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
910
911                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
912                         khelper_cmd = "after-resync-target";
913
914                 if (first_peer_device(device)->connection->csums_tfm && device->rs_total) {
915                         const unsigned long s = device->rs_same_csum;
916                         const unsigned long t = device->rs_total;
917                         const int ratio =
918                                 (t == 0)     ? 0 :
919                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
920                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
921                              "transferred %luK total %luK\n",
922                              ratio,
923                              Bit2KB(device->rs_same_csum),
924                              Bit2KB(device->rs_total - device->rs_same_csum),
925                              Bit2KB(device->rs_total));
926                 }
927         }
928
929         if (device->rs_failed) {
930                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
931
932                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
933                         ns.disk = D_INCONSISTENT;
934                         ns.pdsk = D_UP_TO_DATE;
935                 } else {
936                         ns.disk = D_UP_TO_DATE;
937                         ns.pdsk = D_INCONSISTENT;
938                 }
939         } else {
940                 ns.disk = D_UP_TO_DATE;
941                 ns.pdsk = D_UP_TO_DATE;
942
943                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
944                         if (device->p_uuid) {
945                                 int i;
946                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
947                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
948                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
949                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
950                         } else {
951                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
952                         }
953                 }
954
955                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
956                         /* for verify runs, we don't update uuids here,
957                          * so there would be nothing to report. */
958                         drbd_uuid_set_bm(device, 0UL);
959                         drbd_print_uuids(device, "updated UUIDs");
960                         if (device->p_uuid) {
961                                 /* Now the two UUID sets are equal, update what we
962                                  * know of the peer. */
963                                 int i;
964                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
965                                         device->p_uuid[i] = device->ldev->md.uuid[i];
966                         }
967                 }
968         }
969
970         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
971 out_unlock:
972         spin_unlock_irq(&device->resource->req_lock);
973         put_ldev(device);
974 out:
975         device->rs_total  = 0;
976         device->rs_failed = 0;
977         device->rs_paused = 0;
978
979         /* reset start sector, if we reached end of device */
980         if (verify_done && device->ov_left == 0)
981                 device->ov_start_sector = 0;
982
983         drbd_md_sync(device);
984
985         if (khelper_cmd)
986                 drbd_khelper(device, khelper_cmd);
987
988         return 1;
989 }
990
991 /* helper */
992 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
993 {
994         if (drbd_peer_req_has_active_page(peer_req)) {
995                 /* This might happen if sendpage() has not finished */
996                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
997                 atomic_add(i, &device->pp_in_use_by_net);
998                 atomic_sub(i, &device->pp_in_use);
999                 spin_lock_irq(&device->resource->req_lock);
1000                 list_add_tail(&peer_req->w.list, &device->net_ee);
1001                 spin_unlock_irq(&device->resource->req_lock);
1002                 wake_up(&drbd_pp_wait);
1003         } else
1004                 drbd_free_peer_req(device, peer_req);
1005 }
1006
1007 /**
1008  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1009  * @device:     DRBD device.
1010  * @w:          work object.
1011  * @cancel:     The connection will be closed anyways
1012  */
1013 int w_e_end_data_req(struct drbd_work *w, int cancel)
1014 {
1015         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1016         struct drbd_peer_device *peer_device = peer_req->peer_device;
1017         struct drbd_device *device = peer_device->device;
1018         int err;
1019
1020         if (unlikely(cancel)) {
1021                 drbd_free_peer_req(device, peer_req);
1022                 dec_unacked(device);
1023                 return 0;
1024         }
1025
1026         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1027                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1028         } else {
1029                 if (__ratelimit(&drbd_ratelimit_state))
1030                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1031                             (unsigned long long)peer_req->i.sector);
1032
1033                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1034         }
1035
1036         dec_unacked(device);
1037
1038         move_to_net_ee_or_free(device, peer_req);
1039
1040         if (unlikely(err))
1041                 drbd_err(device, "drbd_send_block() failed\n");
1042         return err;
1043 }
1044
1045 /**
1046  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1047  * @w:          work object.
1048  * @cancel:     The connection will be closed anyways
1049  */
1050 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1051 {
1052         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1053         struct drbd_peer_device *peer_device = peer_req->peer_device;
1054         struct drbd_device *device = peer_device->device;
1055         int err;
1056
1057         if (unlikely(cancel)) {
1058                 drbd_free_peer_req(device, peer_req);
1059                 dec_unacked(device);
1060                 return 0;
1061         }
1062
1063         if (get_ldev_if_state(device, D_FAILED)) {
1064                 drbd_rs_complete_io(device, peer_req->i.sector);
1065                 put_ldev(device);
1066         }
1067
1068         if (device->state.conn == C_AHEAD) {
1069                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1070         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1071                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1072                         inc_rs_pending(device);
1073                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1074                 } else {
1075                         if (__ratelimit(&drbd_ratelimit_state))
1076                                 drbd_err(device, "Not sending RSDataReply, "
1077                                     "partner DISKLESS!\n");
1078                         err = 0;
1079                 }
1080         } else {
1081                 if (__ratelimit(&drbd_ratelimit_state))
1082                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1083                             (unsigned long long)peer_req->i.sector);
1084
1085                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1086
1087                 /* update resync data with failure */
1088                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1089         }
1090
1091         dec_unacked(device);
1092
1093         move_to_net_ee_or_free(device, peer_req);
1094
1095         if (unlikely(err))
1096                 drbd_err(device, "drbd_send_block() failed\n");
1097         return err;
1098 }
1099
1100 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1101 {
1102         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1103         struct drbd_peer_device *peer_device = peer_req->peer_device;
1104         struct drbd_device *device = peer_device->device;
1105         struct digest_info *di;
1106         int digest_size;
1107         void *digest = NULL;
1108         int err, eq = 0;
1109
1110         if (unlikely(cancel)) {
1111                 drbd_free_peer_req(device, peer_req);
1112                 dec_unacked(device);
1113                 return 0;
1114         }
1115
1116         if (get_ldev(device)) {
1117                 drbd_rs_complete_io(device, peer_req->i.sector);
1118                 put_ldev(device);
1119         }
1120
1121         di = peer_req->digest;
1122
1123         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1124                 /* quick hack to try to avoid a race against reconfiguration.
1125                  * a real fix would be much more involved,
1126                  * introducing more locking mechanisms */
1127                 if (peer_device->connection->csums_tfm) {
1128                         digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1129                         D_ASSERT(device, digest_size == di->digest_size);
1130                         digest = kmalloc(digest_size, GFP_NOIO);
1131                 }
1132                 if (digest) {
1133                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1134                         eq = !memcmp(digest, di->digest, digest_size);
1135                         kfree(digest);
1136                 }
1137
1138                 if (eq) {
1139                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1140                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1141                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1142                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1143                 } else {
1144                         inc_rs_pending(device);
1145                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1146                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1147                         kfree(di);
1148                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1149                 }
1150         } else {
1151                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1152                 if (__ratelimit(&drbd_ratelimit_state))
1153                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1154         }
1155
1156         dec_unacked(device);
1157         move_to_net_ee_or_free(device, peer_req);
1158
1159         if (unlikely(err))
1160                 drbd_err(device, "drbd_send_block/ack() failed\n");
1161         return err;
1162 }
1163
1164 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1165 {
1166         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1167         struct drbd_peer_device *peer_device = peer_req->peer_device;
1168         struct drbd_device *device = peer_device->device;
1169         sector_t sector = peer_req->i.sector;
1170         unsigned int size = peer_req->i.size;
1171         int digest_size;
1172         void *digest;
1173         int err = 0;
1174
1175         if (unlikely(cancel))
1176                 goto out;
1177
1178         digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1179         digest = kmalloc(digest_size, GFP_NOIO);
1180         if (!digest) {
1181                 err = 1;        /* terminate the connection in case the allocation failed */
1182                 goto out;
1183         }
1184
1185         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1186                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1187         else
1188                 memset(digest, 0, digest_size);
1189
1190         /* Free e and pages before send.
1191          * In case we block on congestion, we could otherwise run into
1192          * some distributed deadlock, if the other side blocks on
1193          * congestion as well, because our receiver blocks in
1194          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1195         drbd_free_peer_req(device, peer_req);
1196         peer_req = NULL;
1197         inc_rs_pending(device);
1198         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1199         if (err)
1200                 dec_rs_pending(device);
1201         kfree(digest);
1202
1203 out:
1204         if (peer_req)
1205                 drbd_free_peer_req(device, peer_req);
1206         dec_unacked(device);
1207         return err;
1208 }
1209
1210 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1211 {
1212         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1213                 device->ov_last_oos_size += size>>9;
1214         } else {
1215                 device->ov_last_oos_start = sector;
1216                 device->ov_last_oos_size = size>>9;
1217         }
1218         drbd_set_out_of_sync(device, sector, size);
1219 }
1220
1221 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1222 {
1223         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1224         struct drbd_peer_device *peer_device = peer_req->peer_device;
1225         struct drbd_device *device = peer_device->device;
1226         struct digest_info *di;
1227         void *digest;
1228         sector_t sector = peer_req->i.sector;
1229         unsigned int size = peer_req->i.size;
1230         int digest_size;
1231         int err, eq = 0;
1232         bool stop_sector_reached = false;
1233
1234         if (unlikely(cancel)) {
1235                 drbd_free_peer_req(device, peer_req);
1236                 dec_unacked(device);
1237                 return 0;
1238         }
1239
1240         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1241          * the resync lru has been cleaned up already */
1242         if (get_ldev(device)) {
1243                 drbd_rs_complete_io(device, peer_req->i.sector);
1244                 put_ldev(device);
1245         }
1246
1247         di = peer_req->digest;
1248
1249         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1250                 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1251                 digest = kmalloc(digest_size, GFP_NOIO);
1252                 if (digest) {
1253                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1254
1255                         D_ASSERT(device, digest_size == di->digest_size);
1256                         eq = !memcmp(digest, di->digest, digest_size);
1257                         kfree(digest);
1258                 }
1259         }
1260
1261         /* Free peer_req and pages before send.
1262          * In case we block on congestion, we could otherwise run into
1263          * some distributed deadlock, if the other side blocks on
1264          * congestion as well, because our receiver blocks in
1265          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1266         drbd_free_peer_req(device, peer_req);
1267         if (!eq)
1268                 drbd_ov_out_of_sync_found(device, sector, size);
1269         else
1270                 ov_out_of_sync_print(device);
1271
1272         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1273                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1274
1275         dec_unacked(device);
1276
1277         --device->ov_left;
1278
1279         /* let's advance progress step marks only for every other megabyte */
1280         if ((device->ov_left & 0x200) == 0x200)
1281                 drbd_advance_rs_marks(device, device->ov_left);
1282
1283         stop_sector_reached = verify_can_do_stop_sector(device) &&
1284                 (sector + (size>>9)) >= device->ov_stop_sector;
1285
1286         if (device->ov_left == 0 || stop_sector_reached) {
1287                 ov_out_of_sync_print(device);
1288                 drbd_resync_finished(device);
1289         }
1290
1291         return err;
1292 }
1293
1294 /* FIXME
1295  * We need to track the number of pending barrier acks,
1296  * and to be able to wait for them.
1297  * See also comment in drbd_adm_attach before drbd_suspend_io.
1298  */
1299 static int drbd_send_barrier(struct drbd_connection *connection)
1300 {
1301         struct p_barrier *p;
1302         struct drbd_socket *sock;
1303
1304         sock = &connection->data;
1305         p = conn_prepare_command(connection, sock);
1306         if (!p)
1307                 return -EIO;
1308         p->barrier = connection->send.current_epoch_nr;
1309         p->pad = 0;
1310         connection->send.current_epoch_writes = 0;
1311
1312         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1313 }
1314
1315 int w_send_write_hint(struct drbd_work *w, int cancel)
1316 {
1317         struct drbd_device *device =
1318                 container_of(w, struct drbd_device, unplug_work);
1319         struct drbd_socket *sock;
1320
1321         if (cancel)
1322                 return 0;
1323         sock = &first_peer_device(device)->connection->data;
1324         if (!drbd_prepare_command(first_peer_device(device), sock))
1325                 return -EIO;
1326         return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1327 }
1328
1329 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1330 {
1331         if (!connection->send.seen_any_write_yet) {
1332                 connection->send.seen_any_write_yet = true;
1333                 connection->send.current_epoch_nr = epoch;
1334                 connection->send.current_epoch_writes = 0;
1335         }
1336 }
1337
1338 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1339 {
1340         /* re-init if first write on this connection */
1341         if (!connection->send.seen_any_write_yet)
1342                 return;
1343         if (connection->send.current_epoch_nr != epoch) {
1344                 if (connection->send.current_epoch_writes)
1345                         drbd_send_barrier(connection);
1346                 connection->send.current_epoch_nr = epoch;
1347         }
1348 }
1349
1350 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1351 {
1352         struct drbd_request *req = container_of(w, struct drbd_request, w);
1353         struct drbd_device *device = req->device;
1354         struct drbd_connection *connection = first_peer_device(device)->connection;
1355         int err;
1356
1357         if (unlikely(cancel)) {
1358                 req_mod(req, SEND_CANCELED);
1359                 return 0;
1360         }
1361
1362         /* this time, no connection->send.current_epoch_writes++;
1363          * If it was sent, it was the closing barrier for the last
1364          * replicated epoch, before we went into AHEAD mode.
1365          * No more barriers will be sent, until we leave AHEAD mode again. */
1366         maybe_send_barrier(connection, req->epoch);
1367
1368         err = drbd_send_out_of_sync(first_peer_device(device), req);
1369         req_mod(req, OOS_HANDED_TO_NETWORK);
1370
1371         return err;
1372 }
1373
1374 /**
1375  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1376  * @w:          work object.
1377  * @cancel:     The connection will be closed anyways
1378  */
1379 int w_send_dblock(struct drbd_work *w, int cancel)
1380 {
1381         struct drbd_request *req = container_of(w, struct drbd_request, w);
1382         struct drbd_device *device = req->device;
1383         struct drbd_connection *connection = first_peer_device(device)->connection;
1384         int err;
1385
1386         if (unlikely(cancel)) {
1387                 req_mod(req, SEND_CANCELED);
1388                 return 0;
1389         }
1390
1391         re_init_if_first_write(connection, req->epoch);
1392         maybe_send_barrier(connection, req->epoch);
1393         connection->send.current_epoch_writes++;
1394
1395         err = drbd_send_dblock(first_peer_device(device), req);
1396         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1397
1398         return err;
1399 }
1400
1401 /**
1402  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1403  * @w:          work object.
1404  * @cancel:     The connection will be closed anyways
1405  */
1406 int w_send_read_req(struct drbd_work *w, int cancel)
1407 {
1408         struct drbd_request *req = container_of(w, struct drbd_request, w);
1409         struct drbd_device *device = req->device;
1410         struct drbd_connection *connection = first_peer_device(device)->connection;
1411         int err;
1412
1413         if (unlikely(cancel)) {
1414                 req_mod(req, SEND_CANCELED);
1415                 return 0;
1416         }
1417
1418         /* Even read requests may close a write epoch,
1419          * if there was any yet. */
1420         maybe_send_barrier(connection, req->epoch);
1421
1422         err = drbd_send_drequest(first_peer_device(device), P_DATA_REQUEST, req->i.sector, req->i.size,
1423                                  (unsigned long)req);
1424
1425         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1426
1427         return err;
1428 }
1429
1430 int w_restart_disk_io(struct drbd_work *w, int cancel)
1431 {
1432         struct drbd_request *req = container_of(w, struct drbd_request, w);
1433         struct drbd_device *device = req->device;
1434
1435         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1436                 drbd_al_begin_io(device, &req->i, false);
1437
1438         drbd_req_make_private_bio(req, req->master_bio);
1439         req->private_bio->bi_bdev = device->ldev->backing_bdev;
1440         generic_make_request(req->private_bio);
1441
1442         return 0;
1443 }
1444
1445 static int _drbd_may_sync_now(struct drbd_device *device)
1446 {
1447         struct drbd_device *odev = device;
1448         int resync_after;
1449
1450         while (1) {
1451                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1452                         return 1;
1453                 rcu_read_lock();
1454                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1455                 rcu_read_unlock();
1456                 if (resync_after == -1)
1457                         return 1;
1458                 odev = minor_to_device(resync_after);
1459                 if (!odev)
1460                         return 1;
1461                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1462                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1463                     odev->state.aftr_isp || odev->state.peer_isp ||
1464                     odev->state.user_isp)
1465                         return 0;
1466         }
1467 }
1468
1469 /**
1470  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1471  * @device:     DRBD device.
1472  *
1473  * Called from process context only (admin command and after_state_ch).
1474  */
1475 static int _drbd_pause_after(struct drbd_device *device)
1476 {
1477         struct drbd_device *odev;
1478         int i, rv = 0;
1479
1480         rcu_read_lock();
1481         idr_for_each_entry(&drbd_devices, odev, i) {
1482                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1483                         continue;
1484                 if (!_drbd_may_sync_now(odev))
1485                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1486                                != SS_NOTHING_TO_DO);
1487         }
1488         rcu_read_unlock();
1489
1490         return rv;
1491 }
1492
1493 /**
1494  * _drbd_resume_next() - Resume resync on all devices that may resync now
1495  * @device:     DRBD device.
1496  *
1497  * Called from process context only (admin command and worker).
1498  */
1499 static int _drbd_resume_next(struct drbd_device *device)
1500 {
1501         struct drbd_device *odev;
1502         int i, rv = 0;
1503
1504         rcu_read_lock();
1505         idr_for_each_entry(&drbd_devices, odev, i) {
1506                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1507                         continue;
1508                 if (odev->state.aftr_isp) {
1509                         if (_drbd_may_sync_now(odev))
1510                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1511                                                         CS_HARD, NULL)
1512                                        != SS_NOTHING_TO_DO) ;
1513                 }
1514         }
1515         rcu_read_unlock();
1516         return rv;
1517 }
1518
1519 void resume_next_sg(struct drbd_device *device)
1520 {
1521         write_lock_irq(&global_state_lock);
1522         _drbd_resume_next(device);
1523         write_unlock_irq(&global_state_lock);
1524 }
1525
1526 void suspend_other_sg(struct drbd_device *device)
1527 {
1528         write_lock_irq(&global_state_lock);
1529         _drbd_pause_after(device);
1530         write_unlock_irq(&global_state_lock);
1531 }
1532
1533 /* caller must hold global_state_lock */
1534 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1535 {
1536         struct drbd_device *odev;
1537         int resync_after;
1538
1539         if (o_minor == -1)
1540                 return NO_ERROR;
1541         if (o_minor < -1 || o_minor > MINORMASK)
1542                 return ERR_RESYNC_AFTER;
1543
1544         /* check for loops */
1545         odev = minor_to_device(o_minor);
1546         while (1) {
1547                 if (odev == device)
1548                         return ERR_RESYNC_AFTER_CYCLE;
1549
1550                 /* You are free to depend on diskless, non-existing,
1551                  * or not yet/no longer existing minors.
1552                  * We only reject dependency loops.
1553                  * We cannot follow the dependency chain beyond a detached or
1554                  * missing minor.
1555                  */
1556                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1557                         return NO_ERROR;
1558
1559                 rcu_read_lock();
1560                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1561                 rcu_read_unlock();
1562                 /* dependency chain ends here, no cycles. */
1563                 if (resync_after == -1)
1564                         return NO_ERROR;
1565
1566                 /* follow the dependency chain */
1567                 odev = minor_to_device(resync_after);
1568         }
1569 }
1570
1571 /* caller must hold global_state_lock */
1572 void drbd_resync_after_changed(struct drbd_device *device)
1573 {
1574         int changes;
1575
1576         do {
1577                 changes  = _drbd_pause_after(device);
1578                 changes |= _drbd_resume_next(device);
1579         } while (changes);
1580 }
1581
1582 void drbd_rs_controller_reset(struct drbd_device *device)
1583 {
1584         struct fifo_buffer *plan;
1585
1586         atomic_set(&device->rs_sect_in, 0);
1587         atomic_set(&device->rs_sect_ev, 0);
1588         device->rs_in_flight = 0;
1589
1590         /* Updating the RCU protected object in place is necessary since
1591            this function gets called from atomic context.
1592            It is valid since all other updates also lead to an completely
1593            empty fifo */
1594         rcu_read_lock();
1595         plan = rcu_dereference(device->rs_plan_s);
1596         plan->total = 0;
1597         fifo_set(plan, 0);
1598         rcu_read_unlock();
1599 }
1600
1601 void start_resync_timer_fn(unsigned long data)
1602 {
1603         struct drbd_device *device = (struct drbd_device *) data;
1604
1605         drbd_queue_work(&first_peer_device(device)->connection->sender_work,
1606                         &device->start_resync_work);
1607 }
1608
1609 int w_start_resync(struct drbd_work *w, int cancel)
1610 {
1611         struct drbd_device *device =
1612                 container_of(w, struct drbd_device, start_resync_work);
1613
1614         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1615                 drbd_warn(device, "w_start_resync later...\n");
1616                 device->start_resync_timer.expires = jiffies + HZ/10;
1617                 add_timer(&device->start_resync_timer);
1618                 return 0;
1619         }
1620
1621         drbd_start_resync(device, C_SYNC_SOURCE);
1622         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1623         return 0;
1624 }
1625
1626 /**
1627  * drbd_start_resync() - Start the resync process
1628  * @device:     DRBD device.
1629  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1630  *
1631  * This function might bring you directly into one of the
1632  * C_PAUSED_SYNC_* states.
1633  */
1634 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1635 {
1636         union drbd_state ns;
1637         int r;
1638
1639         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1640                 drbd_err(device, "Resync already running!\n");
1641                 return;
1642         }
1643
1644         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1645                 if (side == C_SYNC_TARGET) {
1646                         /* Since application IO was locked out during C_WF_BITMAP_T and
1647                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1648                            we check that we might make the data inconsistent. */
1649                         r = drbd_khelper(device, "before-resync-target");
1650                         r = (r >> 8) & 0xff;
1651                         if (r > 0) {
1652                                 drbd_info(device, "before-resync-target handler returned %d, "
1653                                          "dropping connection.\n", r);
1654                                 conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
1655                                 return;
1656                         }
1657                 } else /* C_SYNC_SOURCE */ {
1658                         r = drbd_khelper(device, "before-resync-source");
1659                         r = (r >> 8) & 0xff;
1660                         if (r > 0) {
1661                                 if (r == 3) {
1662                                         drbd_info(device, "before-resync-source handler returned %d, "
1663                                                  "ignoring. Old userland tools?", r);
1664                                 } else {
1665                                         drbd_info(device, "before-resync-source handler returned %d, "
1666                                                  "dropping connection.\n", r);
1667                                         conn_request_state(first_peer_device(device)->connection,
1668                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1669                                         return;
1670                                 }
1671                         }
1672                 }
1673         }
1674
1675         if (current == first_peer_device(device)->connection->worker.task) {
1676                 /* The worker should not sleep waiting for state_mutex,
1677                    that can take long */
1678                 if (!mutex_trylock(device->state_mutex)) {
1679                         set_bit(B_RS_H_DONE, &device->flags);
1680                         device->start_resync_timer.expires = jiffies + HZ/5;
1681                         add_timer(&device->start_resync_timer);
1682                         return;
1683                 }
1684         } else {
1685                 mutex_lock(device->state_mutex);
1686         }
1687         clear_bit(B_RS_H_DONE, &device->flags);
1688
1689         /* req_lock: serialize with drbd_send_and_submit() and others
1690          * global_state_lock: for stable sync-after dependencies */
1691         spin_lock_irq(&device->resource->req_lock);
1692         write_lock(&global_state_lock);
1693         /* Did some connection breakage or IO error race with us? */
1694         if (device->state.conn < C_CONNECTED
1695         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1696                 write_unlock(&global_state_lock);
1697                 spin_unlock_irq(&device->resource->req_lock);
1698                 mutex_unlock(device->state_mutex);
1699                 return;
1700         }
1701
1702         ns = drbd_read_state(device);
1703
1704         ns.aftr_isp = !_drbd_may_sync_now(device);
1705
1706         ns.conn = side;
1707
1708         if (side == C_SYNC_TARGET)
1709                 ns.disk = D_INCONSISTENT;
1710         else /* side == C_SYNC_SOURCE */
1711                 ns.pdsk = D_INCONSISTENT;
1712
1713         r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1714         ns = drbd_read_state(device);
1715
1716         if (ns.conn < C_CONNECTED)
1717                 r = SS_UNKNOWN_ERROR;
1718
1719         if (r == SS_SUCCESS) {
1720                 unsigned long tw = drbd_bm_total_weight(device);
1721                 unsigned long now = jiffies;
1722                 int i;
1723
1724                 device->rs_failed    = 0;
1725                 device->rs_paused    = 0;
1726                 device->rs_same_csum = 0;
1727                 device->rs_last_events = 0;
1728                 device->rs_last_sect_ev = 0;
1729                 device->rs_total     = tw;
1730                 device->rs_start     = now;
1731                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1732                         device->rs_mark_left[i] = tw;
1733                         device->rs_mark_time[i] = now;
1734                 }
1735                 _drbd_pause_after(device);
1736         }
1737         write_unlock(&global_state_lock);
1738         spin_unlock_irq(&device->resource->req_lock);
1739
1740         if (r == SS_SUCCESS) {
1741                 /* reset rs_last_bcast when a resync or verify is started,
1742                  * to deal with potential jiffies wrap. */
1743                 device->rs_last_bcast = jiffies - HZ;
1744
1745                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1746                      drbd_conn_str(ns.conn),
1747                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1748                      (unsigned long) device->rs_total);
1749                 if (side == C_SYNC_TARGET)
1750                         device->bm_resync_fo = 0;
1751
1752                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1753                  * with w_send_oos, or the sync target will get confused as to
1754                  * how much bits to resync.  We cannot do that always, because for an
1755                  * empty resync and protocol < 95, we need to do it here, as we call
1756                  * drbd_resync_finished from here in that case.
1757                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1758                  * and from after_state_ch otherwise. */
1759                 if (side == C_SYNC_SOURCE &&
1760                     first_peer_device(device)->connection->agreed_pro_version < 96)
1761                         drbd_gen_and_send_sync_uuid(first_peer_device(device));
1762
1763                 if (first_peer_device(device)->connection->agreed_pro_version < 95 &&
1764                     device->rs_total == 0) {
1765                         /* This still has a race (about when exactly the peers
1766                          * detect connection loss) that can lead to a full sync
1767                          * on next handshake. In 8.3.9 we fixed this with explicit
1768                          * resync-finished notifications, but the fix
1769                          * introduces a protocol change.  Sleeping for some
1770                          * time longer than the ping interval + timeout on the
1771                          * SyncSource, to give the SyncTarget the chance to
1772                          * detect connection loss, then waiting for a ping
1773                          * response (implicit in drbd_resync_finished) reduces
1774                          * the race considerably, but does not solve it. */
1775                         if (side == C_SYNC_SOURCE) {
1776                                 struct net_conf *nc;
1777                                 int timeo;
1778
1779                                 rcu_read_lock();
1780                                 nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
1781                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1782                                 rcu_read_unlock();
1783                                 schedule_timeout_interruptible(timeo);
1784                         }
1785                         drbd_resync_finished(device);
1786                 }
1787
1788                 drbd_rs_controller_reset(device);
1789                 /* ns.conn may already be != device->state.conn,
1790                  * we may have been paused in between, or become paused until
1791                  * the timer triggers.
1792                  * No matter, that is handled in resync_timer_fn() */
1793                 if (ns.conn == C_SYNC_TARGET)
1794                         mod_timer(&device->resync_timer, jiffies);
1795
1796                 drbd_md_sync(device);
1797         }
1798         put_ldev(device);
1799         mutex_unlock(device->state_mutex);
1800 }
1801
1802 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1803 {
1804         spin_lock_irq(&queue->q_lock);
1805         list_splice_init(&queue->q, work_list);
1806         spin_unlock_irq(&queue->q_lock);
1807         return !list_empty(work_list);
1808 }
1809
1810 static bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1811 {
1812         spin_lock_irq(&queue->q_lock);
1813         if (!list_empty(&queue->q))
1814                 list_move(queue->q.next, work_list);
1815         spin_unlock_irq(&queue->q_lock);
1816         return !list_empty(work_list);
1817 }
1818
1819 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1820 {
1821         DEFINE_WAIT(wait);
1822         struct net_conf *nc;
1823         int uncork, cork;
1824
1825         dequeue_work_item(&connection->sender_work, work_list);
1826         if (!list_empty(work_list))
1827                 return;
1828
1829         /* Still nothing to do?
1830          * Maybe we still need to close the current epoch,
1831          * even if no new requests are queued yet.
1832          *
1833          * Also, poke TCP, just in case.
1834          * Then wait for new work (or signal). */
1835         rcu_read_lock();
1836         nc = rcu_dereference(connection->net_conf);
1837         uncork = nc ? nc->tcp_cork : 0;
1838         rcu_read_unlock();
1839         if (uncork) {
1840                 mutex_lock(&connection->data.mutex);
1841                 if (connection->data.socket)
1842                         drbd_tcp_uncork(connection->data.socket);
1843                 mutex_unlock(&connection->data.mutex);
1844         }
1845
1846         for (;;) {
1847                 int send_barrier;
1848                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
1849                 spin_lock_irq(&connection->resource->req_lock);
1850                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
1851                 /* dequeue single item only,
1852                  * we still use drbd_queue_work_front() in some places */
1853                 if (!list_empty(&connection->sender_work.q))
1854                         list_move(connection->sender_work.q.next, work_list);
1855                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
1856                 if (!list_empty(work_list) || signal_pending(current)) {
1857                         spin_unlock_irq(&connection->resource->req_lock);
1858                         break;
1859                 }
1860
1861                 /* We found nothing new to do, no to-be-communicated request,
1862                  * no other work item.  We may still need to close the last
1863                  * epoch.  Next incoming request epoch will be connection ->
1864                  * current transfer log epoch number.  If that is different
1865                  * from the epoch of the last request we communicated, it is
1866                  * safe to send the epoch separating barrier now.
1867                  */
1868                 send_barrier =
1869                         atomic_read(&connection->current_tle_nr) !=
1870                         connection->send.current_epoch_nr;
1871                 spin_unlock_irq(&connection->resource->req_lock);
1872
1873                 if (send_barrier)
1874                         maybe_send_barrier(connection,
1875                                         connection->send.current_epoch_nr + 1);
1876                 schedule();
1877                 /* may be woken up for other things but new work, too,
1878                  * e.g. if the current epoch got closed.
1879                  * In which case we send the barrier above. */
1880         }
1881         finish_wait(&connection->sender_work.q_wait, &wait);
1882
1883         /* someone may have changed the config while we have been waiting above. */
1884         rcu_read_lock();
1885         nc = rcu_dereference(connection->net_conf);
1886         cork = nc ? nc->tcp_cork : 0;
1887         rcu_read_unlock();
1888         mutex_lock(&connection->data.mutex);
1889         if (connection->data.socket) {
1890                 if (cork)
1891                         drbd_tcp_cork(connection->data.socket);
1892                 else if (!uncork)
1893                         drbd_tcp_uncork(connection->data.socket);
1894         }
1895         mutex_unlock(&connection->data.mutex);
1896 }
1897
1898 int drbd_worker(struct drbd_thread *thi)
1899 {
1900         struct drbd_connection *connection = thi->connection;
1901         struct drbd_work *w = NULL;
1902         struct drbd_peer_device *peer_device;
1903         LIST_HEAD(work_list);
1904         int vnr;
1905
1906         while (get_t_state(thi) == RUNNING) {
1907                 drbd_thread_current_set_cpu(thi);
1908
1909                 /* as long as we use drbd_queue_work_front(),
1910                  * we may only dequeue single work items here, not batches. */
1911                 if (list_empty(&work_list))
1912                         wait_for_work(connection, &work_list);
1913
1914                 if (signal_pending(current)) {
1915                         flush_signals(current);
1916                         if (get_t_state(thi) == RUNNING) {
1917                                 drbd_warn(connection, "Worker got an unexpected signal\n");
1918                                 continue;
1919                         }
1920                         break;
1921                 }
1922
1923                 if (get_t_state(thi) != RUNNING)
1924                         break;
1925
1926                 while (!list_empty(&work_list)) {
1927                         w = list_first_entry(&work_list, struct drbd_work, list);
1928                         list_del_init(&w->list);
1929                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
1930                                 continue;
1931                         if (connection->cstate >= C_WF_REPORT_PARAMS)
1932                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1933                 }
1934         }
1935
1936         do {
1937                 while (!list_empty(&work_list)) {
1938                         w = list_first_entry(&work_list, struct drbd_work, list);
1939                         list_del_init(&w->list);
1940                         w->cb(w, 1);
1941                 }
1942                 dequeue_work_batch(&connection->sender_work, &work_list);
1943         } while (!list_empty(&work_list));
1944
1945         rcu_read_lock();
1946         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1947                 struct drbd_device *device = peer_device->device;
1948                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
1949                 kref_get(&device->kref);
1950                 rcu_read_unlock();
1951                 drbd_device_cleanup(device);
1952                 kref_put(&device->kref, drbd_destroy_device);
1953                 rcu_read_lock();
1954         }
1955         rcu_read_unlock();
1956
1957         return 0;
1958 }