Use separate bitmaps for each nodes in the cluster
[firefly-linux-kernel-4.4.55.git] / drivers / md / bitmap.c
index b43a75a246e7d317af08a62b1b7979bdf88ed0cd..b1d94eee33469f976b40fdc4402cde7e4b26674b 100644 (file)
@@ -205,6 +205,10 @@ static int write_sb_page(struct bitmap *bitmap, struct page *page, int wait)
        struct block_device *bdev;
        struct mddev *mddev = bitmap->mddev;
        struct bitmap_storage *store = &bitmap->storage;
+       int node_offset = 0;
+
+       if (mddev_is_clustered(bitmap->mddev))
+               node_offset = bitmap->cluster_slot * store->file_pages;
 
        while ((rdev = next_active_rdev(rdev, mddev)) != NULL) {
                int size = PAGE_SIZE;
@@ -549,6 +553,7 @@ static int bitmap_read_sb(struct bitmap *bitmap)
        unsigned long sectors_reserved = 0;
        int err = -EINVAL;
        struct page *sb_page;
+       int cluster_setup_done = 0;
 
        if (!bitmap->storage.file && !bitmap->mddev->bitmap_info.offset) {
                chunksize = 128 * 1024 * 1024;
@@ -564,6 +569,7 @@ static int bitmap_read_sb(struct bitmap *bitmap)
                return -ENOMEM;
        bitmap->storage.sb_page = sb_page;
 
+re_read:
        if (bitmap->storage.file) {
                loff_t isize = i_size_read(bitmap->storage.file->f_mapping->host);
                int bytes = isize > PAGE_SIZE ? PAGE_SIZE : isize;
@@ -579,6 +585,7 @@ static int bitmap_read_sb(struct bitmap *bitmap)
        if (err)
                return err;
 
+       err = -EINVAL;
        sb = kmap_atomic(sb_page);
 
        chunksize = le32_to_cpu(sb->chunksize);
@@ -586,6 +593,7 @@ static int bitmap_read_sb(struct bitmap *bitmap)
        write_behind = le32_to_cpu(sb->write_behind);
        sectors_reserved = le32_to_cpu(sb->sectors_reserved);
        nodes = le32_to_cpu(sb->nodes);
+       strlcpy(bitmap->mddev->bitmap_info.cluster_name, sb->cluster_name, 64);
 
        /* verify that the bitmap-specific fields are valid */
        if (sb->magic != cpu_to_le32(BITMAP_MAGIC))
@@ -622,7 +630,7 @@ static int bitmap_read_sb(struct bitmap *bitmap)
                        goto out;
                }
                events = le64_to_cpu(sb->events);
-               if (events < bitmap->mddev->events) {
+               if (!nodes && (events < bitmap->mddev->events)) {
                        printk(KERN_INFO
                               "%s: bitmap file is out of date (%llu < %llu) "
                               "-- forcing full recovery\n",
@@ -639,8 +647,34 @@ static int bitmap_read_sb(struct bitmap *bitmap)
        bitmap->events_cleared = le64_to_cpu(sb->events_cleared);
        strlcpy(bitmap->mddev->bitmap_info.cluster_name, sb->cluster_name, 64);
        err = 0;
+
 out:
        kunmap_atomic(sb);
+       if (nodes && !cluster_setup_done) {
+               sector_t bm_blocks;
+
+               bm_blocks = sector_div(bitmap->mddev->resync_max_sectors, (chunksize >> 9));
+               bm_blocks = bm_blocks << 3;
+               /* We have bitmap supers at 4k boundaries, hence this
+                * is hardcoded */
+               bm_blocks = DIV_ROUND_UP(bm_blocks, 4096);
+               err = md_setup_cluster(bitmap->mddev, nodes);
+               if (err) {
+                       pr_err("%s: Could not setup cluster service (%d)\n",
+                                       bmname(bitmap), err);
+                       goto out_no_sb;
+               }
+               bitmap->cluster_slot = md_cluster_ops->slot_number(bitmap->mddev);
+               bitmap->mddev->bitmap_info.offset +=
+                       bitmap->cluster_slot * (bm_blocks << 3);
+               pr_info("%s:%d bm slot: %d offset: %llu\n", __func__, __LINE__,
+                       bitmap->cluster_slot,
+                       (unsigned long long)bitmap->mddev->bitmap_info.offset);
+               cluster_setup_done = 1;
+               goto re_read;
+       }
+
+
 out_no_sb:
        if (test_bit(BITMAP_STALE, &bitmap->flags))
                bitmap->events_cleared = bitmap->mddev->events;
@@ -651,8 +685,11 @@ out_no_sb:
        if (bitmap->mddev->bitmap_info.space == 0 ||
            bitmap->mddev->bitmap_info.space > sectors_reserved)
                bitmap->mddev->bitmap_info.space = sectors_reserved;
-       if (err)
+       if (err) {
                bitmap_print_sb(bitmap);
+               if (cluster_setup_done)
+                       md_cluster_stop(bitmap->mddev);
+       }
        return err;
 }
 
@@ -697,9 +734,10 @@ static inline struct page *filemap_get_page(struct bitmap_storage *store,
 }
 
 static int bitmap_storage_alloc(struct bitmap_storage *store,
-                               unsigned long chunks, int with_super)
+                               unsigned long chunks, int with_super,
+                               int slot_number)
 {
-       int pnum;
+       int pnum, offset = 0;
        unsigned long num_pages;
        unsigned long bytes;
 
@@ -708,6 +746,7 @@ static int bitmap_storage_alloc(struct bitmap_storage *store,
                bytes += sizeof(bitmap_super_t);
 
        num_pages = DIV_ROUND_UP(bytes, PAGE_SIZE);
+       offset = slot_number * (num_pages - 1);
 
        store->filemap = kmalloc(sizeof(struct page *)
                                 * num_pages, GFP_KERNEL);
@@ -718,20 +757,22 @@ static int bitmap_storage_alloc(struct bitmap_storage *store,
                store->sb_page = alloc_page(GFP_KERNEL|__GFP_ZERO);
                if (store->sb_page == NULL)
                        return -ENOMEM;
-               store->sb_page->index = 0;
        }
+
        pnum = 0;
        if (store->sb_page) {
                store->filemap[0] = store->sb_page;
                pnum = 1;
+               store->sb_page->index = offset;
        }
+
        for ( ; pnum < num_pages; pnum++) {
                store->filemap[pnum] = alloc_page(GFP_KERNEL|__GFP_ZERO);
                if (!store->filemap[pnum]) {
                        store->file_pages = pnum;
                        return -ENOMEM;
                }
-               store->filemap[pnum]->index = pnum;
+               store->filemap[pnum]->index = pnum + offset;
        }
        store->file_pages = pnum;
 
@@ -940,7 +981,7 @@ static void bitmap_set_memory_bits(struct bitmap *bitmap, sector_t offset, int n
  */
 static int bitmap_init_from_disk(struct bitmap *bitmap, sector_t start)
 {
-       unsigned long i, chunks, index, oldindex, bit;
+       unsigned long i, chunks, index, oldindex, bit, node_offset = 0;
        struct page *page = NULL;
        unsigned long bit_cnt = 0;
        struct file *file;
@@ -986,6 +1027,9 @@ static int bitmap_init_from_disk(struct bitmap *bitmap, sector_t start)
        if (!bitmap->mddev->bitmap_info.external)
                offset = sizeof(bitmap_super_t);
 
+       if (mddev_is_clustered(bitmap->mddev))
+               node_offset = bitmap->cluster_slot * (DIV_ROUND_UP(store->bytes, PAGE_SIZE));
+
        for (i = 0; i < chunks; i++) {
                int b;
                index = file_page_index(&bitmap->storage, i);
@@ -1006,7 +1050,7 @@ static int bitmap_init_from_disk(struct bitmap *bitmap, sector_t start)
                                        bitmap->mddev,
                                        bitmap->mddev->bitmap_info.offset,
                                        page,
-                                       index, count);
+                                       index + node_offset, count);
 
                        if (ret)
                                goto err;
@@ -1212,7 +1256,6 @@ void bitmap_daemon_work(struct mddev *mddev)
             j < bitmap->storage.file_pages
                     && !test_bit(BITMAP_STALE, &bitmap->flags);
             j++) {
-
                if (test_page_attr(bitmap, j,
                                   BITMAP_PAGE_DIRTY))
                        /* bitmap_unplug will handle the rest */
@@ -1596,6 +1639,9 @@ static void bitmap_free(struct bitmap *bitmap)
        if (!bitmap) /* there was no bitmap */
                return;
 
+       if (mddev_is_clustered(bitmap->mddev) && bitmap->mddev->cluster_info)
+               md_cluster_stop(bitmap->mddev);
+
        /* Shouldn't be needed - but just in case.... */
        wait_event(bitmap->write_wait,
                   atomic_read(&bitmap->pending_writes) == 0);
@@ -1854,7 +1900,8 @@ int bitmap_resize(struct bitmap *bitmap, sector_t blocks,
        memset(&store, 0, sizeof(store));
        if (bitmap->mddev->bitmap_info.offset || bitmap->mddev->bitmap_info.file)
                ret = bitmap_storage_alloc(&store, chunks,
-                                          !bitmap->mddev->bitmap_info.external);
+                                          !bitmap->mddev->bitmap_info.external,
+                                          bitmap->cluster_slot);
        if (ret)
                goto err;