rbd: more symbol renames
[firefly-linux-kernel-4.4.55.git] / drivers / block / rbd.c
index a6278e7e61a00bfde01bcb6726524ffb3a06514e..2fe160014f58a40b1036905fe4b941bbaf027f7a 100644 (file)
 
 #include "rbd_types.h"
 
-#define DRV_NAME "rbd"
-#define DRV_NAME_LONG "rbd (rados block device)"
+/*
+ * The basic unit of block I/O is a sector.  It is interpreted in a
+ * number of contexts in Linux (blk, bio, genhd), but the default is
+ * universally 512 bytes.  These symbols are just slightly more
+ * meaningful than the bare numbers they represent.
+ */
+#define        SECTOR_SHIFT    9
+#define        SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
+
+#define RBD_DRV_NAME "rbd"
+#define RBD_DRV_NAME_LONG "rbd (rados block device)"
 
 #define RBD_MINORS_PER_MAJOR   256             /* max minors per blkdev */
 
-#define RBD_MAX_MD_NAME_LEN    (96 + sizeof(RBD_SUFFIX))
-#define RBD_MAX_POOL_NAME_LEN  64
 #define RBD_MAX_SNAP_NAME_LEN  32
 #define RBD_MAX_OPT_LEN                1024
 
 #define RBD_SNAP_HEAD_NAME     "-"
 
+/*
+ * An RBD device name will be "rbd#", where the "rbd" comes from
+ * RBD_DRV_NAME above, and # is a unique integer identifier.
+ * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
+ * enough to hold all possible device names.
+ */
 #define DEV_NAME_LEN           32
+#define MAX_INT_FORMAT_WIDTH   ((5 * sizeof (int)) / 2 + 1)
 
 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
 
  */
 struct rbd_image_header {
        u64 image_size;
-       char block_name[32];
+       char *object_prefix;
        __u8 obj_order;
        __u8 crypt_type;
        __u8 comp_type;
-       struct rw_semaphore snap_rwsem;
        struct ceph_snap_context *snapc;
        size_t snap_names_len;
        u64 snap_seq;
@@ -83,7 +96,7 @@ struct rbd_options {
 };
 
 /*
- * an instance of the client.  multiple devices may share a client.
+ * an instance of the client.  multiple devices may share an rbd client.
  */
 struct rbd_client {
        struct ceph_client      *client;
@@ -92,20 +105,9 @@ struct rbd_client {
        struct list_head        node;
 };
 
-struct rbd_req_coll;
-
 /*
- * a single io request
+ * a request completion status
  */
-struct rbd_request {
-       struct request          *rq;            /* blk layer request */
-       struct bio              *bio;           /* cloned bio */
-       struct page             **pages;        /* list of used pages */
-       u64                     len;
-       int                     coll_index;
-       struct rbd_req_coll     *coll;
-};
-
 struct rbd_req_status {
        int done;
        int rc;
@@ -122,10 +124,22 @@ struct rbd_req_coll {
        struct rbd_req_status   status[0];
 };
 
+/*
+ * a single io request
+ */
+struct rbd_request {
+       struct request          *rq;            /* blk layer request */
+       struct bio              *bio;           /* cloned bio */
+       struct page             **pages;        /* list of used pages */
+       u64                     len;
+       int                     coll_index;
+       struct rbd_req_coll     *coll;
+};
+
 struct rbd_snap {
        struct  device          dev;
        const char              *name;
-       size_t                  size;
+       u64                     size;
        struct list_head        node;
        u64                     id;
 };
@@ -140,7 +154,6 @@ struct rbd_device {
        struct gendisk          *disk;          /* blkdev's gendisk and rq */
        struct request_queue    *q;
 
-       struct ceph_client      *client;
        struct rbd_client       *rbd_client;
 
        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
@@ -148,18 +161,19 @@ struct rbd_device {
        spinlock_t              lock;           /* queue lock */
 
        struct rbd_image_header header;
-       char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
-       int                     obj_len;
-       char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
-       char                    pool_name[RBD_MAX_POOL_NAME_LEN];
-       int                     poolid;
+       char                    *image_name;
+       size_t                  image_name_len;
+       char                    *header_name;
+       char                    *pool_name;
+       int                     pool_id;
 
        struct ceph_osd_event   *watch_event;
        struct ceph_osd_request *watch_request;
 
-       char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
-       u32 cur_snap;   /* index+1 of current snapshot within snap context
-                          0 - for the head */
+       /* protects updating the header */
+       struct rw_semaphore     header_rwsem;
+       char                    *snap_name;
+       u64                     snap_id;        /* current snapshot id */
        int read_only;
 
        struct list_head        node;
@@ -171,15 +185,13 @@ struct rbd_device {
        struct device           dev;
 };
 
-static struct bus_type rbd_bus_type = {
-       .name           = "rbd",
-};
-
-static spinlock_t node_lock;      /* protects client get/put */
-
 static DEFINE_MUTEX(ctl_mutex);          /* Serialize open/close/setup/teardown */
+
 static LIST_HEAD(rbd_dev_list);    /* devices */
-static LIST_HEAD(rbd_client_list);      /* clients */
+static DEFINE_SPINLOCK(rbd_dev_list_lock);
+
+static LIST_HEAD(rbd_client_list);             /* clients */
+static DEFINE_SPINLOCK(rbd_client_list_lock);
 
 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
 static void rbd_dev_release(struct device *dev);
@@ -190,12 +202,32 @@ static ssize_t rbd_snap_add(struct device *dev,
 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
                                  struct rbd_snap *snap);
 
+static ssize_t rbd_add(struct bus_type *bus, const char *buf,
+                      size_t count);
+static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
+                         size_t count);
+
+static struct bus_attribute rbd_bus_attrs[] = {
+       __ATTR(add, S_IWUSR, NULL, rbd_add),
+       __ATTR(remove, S_IWUSR, NULL, rbd_remove),
+       __ATTR_NULL
+};
+
+static struct bus_type rbd_bus_type = {
+       .name           = "rbd",
+       .bus_attrs      = rbd_bus_attrs,
+};
 
-static struct rbd_device *dev_to_rbd(struct device *dev)
+static void rbd_root_dev_release(struct device *dev)
 {
-       return container_of(dev, struct rbd_device, dev);
 }
 
+static struct device rbd_root_dev = {
+       .init_name =    "rbd",
+       .release =      rbd_root_dev_release,
+};
+
+
 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
 {
        return get_device(&rbd_dev->dev);
@@ -206,12 +238,11 @@ static void rbd_put_dev(struct rbd_device *rbd_dev)
        put_device(&rbd_dev->dev);
 }
 
-static int __rbd_update_snaps(struct rbd_device *rbd_dev);
+static int __rbd_refresh_header(struct rbd_device *rbd_dev);
 
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
-       struct gendisk *disk = bdev->bd_disk;
-       struct rbd_device *rbd_dev = disk->private_data;
+       struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 
        rbd_get_dev(rbd_dev);
 
@@ -256,9 +287,11 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt,
        kref_init(&rbdc->kref);
        INIT_LIST_HEAD(&rbdc->node);
 
+       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+
        rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
        if (IS_ERR(rbdc->client))
-               goto out_rbdc;
+               goto out_mutex;
        opt = NULL; /* Now rbdc->client is responsible for opt */
 
        ret = ceph_open_session(rbdc->client);
@@ -267,16 +300,19 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt,
 
        rbdc->rbd_opts = rbd_opts;
 
-       spin_lock(&node_lock);
+       spin_lock(&rbd_client_list_lock);
        list_add_tail(&rbdc->node, &rbd_client_list);
-       spin_unlock(&node_lock);
+       spin_unlock(&rbd_client_list_lock);
+
+       mutex_unlock(&ctl_mutex);
 
        dout("rbd_client_create created %p\n", rbdc);
        return rbdc;
 
 out_err:
        ceph_destroy_client(rbdc->client);
-out_rbdc:
+out_mutex:
+       mutex_unlock(&ctl_mutex);
        kfree(rbdc);
 out_opt:
        if (opt)
@@ -324,7 +360,7 @@ static int parse_rbd_opts_token(char *c, void *private)
        substring_t argstr[MAX_OPT_ARGS];
        int token, intval, ret;
 
-       token = match_token((char *)c, rbdopt_tokens, argstr);
+       token = match_token(c, rbdopt_tokens, argstr);
        if (token < 0)
                return -EINVAL;
 
@@ -357,65 +393,63 @@ static int parse_rbd_opts_token(char *c, void *private)
  * Get a ceph client with specific addr and configuration, if one does
  * not exist create it.
  */
-static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
-                         char *options)
+static struct rbd_client *rbd_get_client(const char *mon_addr,
+                                        size_t mon_addr_len,
+                                        char *options)
 {
        struct rbd_client *rbdc;
        struct ceph_options *opt;
-       int ret;
        struct rbd_options *rbd_opts;
 
        rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
        if (!rbd_opts)
-               return -ENOMEM;
+               return ERR_PTR(-ENOMEM);
 
        rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
 
-       ret = ceph_parse_options(&opt, options, mon_addr,
-                                mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
-       if (ret < 0)
-               goto done_err;
+       opt = ceph_parse_options(options, mon_addr,
+                               mon_addr + mon_addr_len,
+                               parse_rbd_opts_token, rbd_opts);
+       if (IS_ERR(opt)) {
+               kfree(rbd_opts);
+               return ERR_CAST(opt);
+       }
 
-       spin_lock(&node_lock);
+       spin_lock(&rbd_client_list_lock);
        rbdc = __rbd_client_find(opt);
        if (rbdc) {
+               /* using an existing client */
+               kref_get(&rbdc->kref);
+               spin_unlock(&rbd_client_list_lock);
+
                ceph_destroy_options(opt);
                kfree(rbd_opts);
 
-               /* using an existing client */
-               kref_get(&rbdc->kref);
-               rbd_dev->rbd_client = rbdc;
-               rbd_dev->client = rbdc->client;
-               spin_unlock(&node_lock);
-               return 0;
+               return rbdc;
        }
-       spin_unlock(&node_lock);
+       spin_unlock(&rbd_client_list_lock);
 
        rbdc = rbd_client_create(opt, rbd_opts);
-       if (IS_ERR(rbdc)) {
-               ret = PTR_ERR(rbdc);
-               goto done_err;
-       }
 
-       rbd_dev->rbd_client = rbdc;
-       rbd_dev->client = rbdc->client;
-       return 0;
-done_err:
-       kfree(rbd_opts);
-       return ret;
+       if (IS_ERR(rbdc))
+               kfree(rbd_opts);
+
+       return rbdc;
 }
 
 /*
  * Destroy ceph client
  *
- * Caller must hold node_lock.
+ * Caller must hold rbd_client_list_lock.
  */
 static void rbd_client_release(struct kref *kref)
 {
        struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 
        dout("rbd_release_client %p\n", rbdc);
+       spin_lock(&rbd_client_list_lock);
        list_del(&rbdc->node);
+       spin_unlock(&rbd_client_list_lock);
 
        ceph_destroy_client(rbdc->client);
        kfree(rbdc->rbd_opts);
@@ -428,11 +462,8 @@ static void rbd_client_release(struct kref *kref)
  */
 static void rbd_put_client(struct rbd_device *rbd_dev)
 {
-       spin_lock(&node_lock);
        kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
-       spin_unlock(&node_lock);
        rbd_dev->rbd_client = NULL;
-       rbd_dev->client = NULL;
 }
 
 /*
@@ -453,40 +484,47 @@ static void rbd_coll_release(struct kref *kref)
  */
 static int rbd_header_from_disk(struct rbd_image_header *header,
                                 struct rbd_image_header_ondisk *ondisk,
-                                int allocated_snaps,
+                                u32 allocated_snaps,
                                 gfp_t gfp_flags)
 {
-       int i;
-       u32 snap_count = le32_to_cpu(ondisk->snap_count);
-       int ret = -ENOMEM;
+       u32 i, snap_count;
 
-       if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) {
+       if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
                return -ENXIO;
-       }
 
-       init_rwsem(&header->snap_rwsem);
-       header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
+       snap_count = le32_to_cpu(ondisk->snap_count);
+       if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
+                        / sizeof (*ondisk))
+               return -EINVAL;
        header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
-                               snap_count *
-                                sizeof(struct rbd_image_snap_ondisk),
+                               snap_count * sizeof(u64),
                                gfp_flags);
        if (!header->snapc)
                return -ENOMEM;
+
+       header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
        if (snap_count) {
                header->snap_names = kmalloc(header->snap_names_len,
-                                            GFP_KERNEL);
+                                            gfp_flags);
                if (!header->snap_names)
                        goto err_snapc;
                header->snap_sizes = kmalloc(snap_count * sizeof(u64),
-                                            GFP_KERNEL);
+                                            gfp_flags);
                if (!header->snap_sizes)
                        goto err_names;
        } else {
                header->snap_names = NULL;
                header->snap_sizes = NULL;
        }
-       memcpy(header->block_name, ondisk->block_name,
+
+       header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
+                                       gfp_flags);
+       if (!header->object_prefix)
+               goto err_sizes;
+
+       memcpy(header->object_prefix, ondisk->block_name,
               sizeof(ondisk->block_name));
+       header->object_prefix[sizeof (ondisk->block_name)] = '\0';
 
        header->image_size = le64_to_cpu(ondisk->image_size);
        header->obj_order = ondisk->options.order;
@@ -498,8 +536,7 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
        header->snapc->num_snaps = snap_count;
        header->total_snaps = snap_count;
 
-       if (snap_count &&
-           allocated_snaps == snap_count) {
+       if (snap_count && allocated_snaps == snap_count) {
                for (i = 0; i < snap_count; i++) {
                        header->snapc->snaps[i] =
                                le64_to_cpu(ondisk->snaps[i].id);
@@ -514,26 +551,13 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
 
        return 0;
 
+err_sizes:
+       kfree(header->snap_sizes);
 err_names:
        kfree(header->snap_names);
 err_snapc:
        kfree(header->snapc);
-       return ret;
-}
-
-static int snap_index(struct rbd_image_header *header, int snap_num)
-{
-       return header->total_snaps - snap_num;
-}
-
-static u64 cur_snap_id(struct rbd_device *rbd_dev)
-{
-       struct rbd_image_header *header = &rbd_dev->header;
-
-       if (!rbd_dev->cur_snap)
-               return 0;
-
-       return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
+       return -ENOMEM;
 }
 
 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
@@ -542,70 +566,68 @@ static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
        int i;
        char *p = header->snap_names;
 
-       for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
-               if (strcmp(snap_name, p) == 0)
-                       break;
-       }
-       if (i == header->total_snaps)
-               return -ENOENT;
-       if (seq)
-               *seq = header->snapc->snaps[i];
+       for (i = 0; i < header->total_snaps; i++) {
+               if (!strcmp(snap_name, p)) {
 
-       if (size)
-               *size = header->snap_sizes[i];
+                       /* Found it.  Pass back its id and/or size */
 
-       return i;
+                       if (seq)
+                               *seq = header->snapc->snaps[i];
+                       if (size)
+                               *size = header->snap_sizes[i];
+                       return i;
+               }
+               p += strlen(p) + 1;     /* Skip ahead to the next name */
+       }
+       return -ENOENT;
 }
 
-static int rbd_header_set_snap(struct rbd_device *dev,
-                              const char *snap_name,
-                              u64 *size)
+static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
 {
-       struct rbd_image_header *header = &dev->header;
+       struct rbd_image_header *header = &rbd_dev->header;
        struct ceph_snap_context *snapc = header->snapc;
        int ret = -ENOENT;
 
-       down_write(&header->snap_rwsem);
+       down_write(&rbd_dev->header_rwsem);
 
-       if (!snap_name ||
-           !*snap_name ||
-           strcmp(snap_name, "-") == 0 ||
-           strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
+       if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
+                   sizeof (RBD_SNAP_HEAD_NAME))) {
                if (header->total_snaps)
                        snapc->seq = header->snap_seq;
                else
                        snapc->seq = 0;
-               dev->cur_snap = 0;
-               dev->read_only = 0;
+               rbd_dev->snap_id = CEPH_NOSNAP;
+               rbd_dev->read_only = 0;
                if (size)
                        *size = header->image_size;
        } else {
-               ret = snap_by_name(header, snap_name, &snapc->seq, size);
+               ret = snap_by_name(header, rbd_dev->snap_name,
+                                       &snapc->seq, size);
                if (ret < 0)
                        goto done;
-
-               dev->cur_snap = header->total_snaps - ret;
-               dev->read_only = 1;
+               rbd_dev->snap_id = snapc->seq;
+               rbd_dev->read_only = 1;
        }
 
        ret = 0;
 done:
-       up_write(&header->snap_rwsem);
+       up_write(&rbd_dev->header_rwsem);
        return ret;
 }
 
 static void rbd_header_free(struct rbd_image_header *header)
 {
-       kfree(header->snapc);
-       kfree(header->snap_names);
+       kfree(header->object_prefix);
        kfree(header->snap_sizes);
+       kfree(header->snap_names);
+       kfree(header->snapc);
 }
 
 /*
  * get the actual striped segment name, offset and length
  */
 static u64 rbd_get_segment(struct rbd_image_header *header,
-                          const char *block_name,
+                          const char *object_prefix,
                           u64 ofs, u64 len,
                           char *seg_name, u64 *segofs)
 {
@@ -613,7 +635,7 @@ static u64 rbd_get_segment(struct rbd_image_header *header,
 
        if (seg_name)
                snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
-                        "%s.%012llx", block_name, seg);
+                        "%s.%012llx", object_prefix, seg);
 
        ofs = ofs & ((1 << header->obj_order) - 1);
        len = min_t(u64, len, (1 << header->obj_order) - ofs);
@@ -717,7 +739,7 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
 
                        /* split the bio. We'll release it either in the next
                           call, or it will have to be released outside */
-                       bp = bio_split(old_chain, (len - total) / 512ULL);
+                       bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
                        if (!bp)
                                goto err_out;
 
@@ -833,10 +855,10 @@ static void rbd_coll_end_req(struct rbd_request *req,
  * Send ceph osd request
  */
 static int rbd_do_request(struct request *rq,
-                         struct rbd_device *dev,
+                         struct rbd_device *rbd_dev,
                          struct ceph_snap_context *snapc,
                          u64 snapid,
-                         const char *obj, u64 ofs, u64 len,
+                         const char *object_name, u64 ofs, u64 len,
                          struct bio *bio,
                          struct page **pages,
                          int num_pages,
@@ -857,7 +879,7 @@ static int rbd_do_request(struct request *rq,
        struct timespec mtime = CURRENT_TIME;
        struct rbd_request *req_data;
        struct ceph_osd_request_head *reqhead;
-       struct rbd_image_header *header = &dev->header;
+       struct ceph_osd_client *osdc;
 
        req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
        if (!req_data) {
@@ -872,17 +894,16 @@ static int rbd_do_request(struct request *rq,
                req_data->coll_index = coll_index;
        }
 
-       dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
+       dout("rbd_do_request object_name=%s ofs=%lld len=%lld\n",
+               object_name, len, ofs);
 
-       down_read(&header->snap_rwsem);
+       down_read(&rbd_dev->header_rwsem);
 
-       req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
-                                     snapc,
-                                     ops,
-                                     false,
-                                     GFP_NOIO, pages, bio);
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
+                                       false, GFP_NOIO, pages, bio);
        if (!req) {
-               up_read(&header->snap_rwsem);
+               up_read(&rbd_dev->header_rwsem);
                ret = -ENOMEM;
                goto done_pages;
        }
@@ -899,7 +920,7 @@ static int rbd_do_request(struct request *rq,
        reqhead = req->r_request->front.iov_base;
        reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
 
-       strncpy(req->r_oid, obj, sizeof(req->r_oid));
+       strncpy(req->r_oid, object_name, sizeof(req->r_oid));
        req->r_oid_len = strlen(req->r_oid);
 
        layout = &req->r_file_layout;
@@ -907,29 +928,28 @@ static int rbd_do_request(struct request *rq,
        layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
        layout->fl_stripe_count = cpu_to_le32(1);
        layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-       layout->fl_pg_preferred = cpu_to_le32(-1);
-       layout->fl_pg_pool = cpu_to_le32(dev->poolid);
-       ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
-                            ofs, &len, &bno, req, ops);
+       layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
+       ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
+                               req, ops);
 
        ceph_osdc_build_request(req, ofs, &len,
                                ops,
                                snapc,
                                &mtime,
                                req->r_oid, req->r_oid_len);
-       up_read(&header->snap_rwsem);
+       up_read(&rbd_dev->header_rwsem);
 
        if (linger_req) {
-               ceph_osdc_set_request_linger(&dev->client->osdc, req);
+               ceph_osdc_set_request_linger(osdc, req);
                *linger_req = req;
        }
 
-       ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
+       ret = ceph_osdc_start_request(osdc, req, false);
        if (ret < 0)
                goto done_err;
 
        if (!rbd_cb) {
-               ret = ceph_osdc_wait_request(&dev->client->osdc, req);
+               ret = ceph_osdc_wait_request(osdc, req);
                if (ver)
                        *ver = le64_to_cpu(req->r_reassert_version.version);
                dout("reassert_ver=%lld\n",
@@ -965,7 +985,7 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
        op = (void *)(replyhead + 1);
        rc = le32_to_cpu(replyhead->result);
        bytes = le64_to_cpu(op->extent.length);
-       read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
+       read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
 
        dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
 
@@ -994,14 +1014,14 @@ static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg
 /*
  * Do a synchronous ceph osd operation
  */
-static int rbd_req_sync_op(struct rbd_device *dev,
+static int rbd_req_sync_op(struct rbd_device *rbd_dev,
                           struct ceph_snap_context *snapc,
                           u64 snapid,
                           int opcode,
                           int flags,
                           struct ceph_osd_req_op *orig_ops,
                           int num_reply,
-                          const char *obj,
+                          const char *object_name,
                           u64 ofs, u64 len,
                           char *buf,
                           struct ceph_osd_request **linger_req,
@@ -1031,8 +1051,8 @@ static int rbd_req_sync_op(struct rbd_device *dev,
                }
        }
 
-       ret = rbd_do_request(NULL, dev, snapc, snapid,
-                         obj, ofs, len, NULL,
+       ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
+                         object_name, ofs, len, NULL,
                          pages, num_pages,
                          flags,
                          ops,
@@ -1058,7 +1078,7 @@ done:
  * Do an asynchronous ceph osd operation
  */
 static int rbd_do_op(struct request *rq,
-                    struct rbd_device *rbd_dev ,
+                    struct rbd_device *rbd_dev,
                     struct ceph_snap_context *snapc,
                     u64 snapid,
                     int opcode, int flags, int num_reply,
@@ -1079,7 +1099,7 @@ static int rbd_do_op(struct request *rq,
                return -ENOMEM;
 
        seg_len = rbd_get_segment(&rbd_dev->header,
-                                 rbd_dev->header.block_name,
+                                 rbd_dev->header.object_prefix,
                                  ofs, len,
                                  seg_name, &seg_ofs);
 
@@ -1140,7 +1160,7 @@ static int rbd_req_read(struct request *rq,
                         int coll_index)
 {
        return rbd_do_op(rq, rbd_dev, NULL,
-                        (snapid ? snapid : CEPH_NOSNAP),
+                        snapid,
                         CEPH_OSD_OP_READ,
                         CEPH_OSD_FLAG_READ,
                         2,
@@ -1150,45 +1170,44 @@ static int rbd_req_read(struct request *rq,
 /*
  * Request sync osd read
  */
-static int rbd_req_sync_read(struct rbd_device *dev,
+static int rbd_req_sync_read(struct rbd_device *rbd_dev,
                          struct ceph_snap_context *snapc,
                          u64 snapid,
-                         const char *obj,
+                         const char *object_name,
                          u64 ofs, u64 len,
                          char *buf,
                          u64 *ver)
 {
-       return rbd_req_sync_op(dev, NULL,
-                              (snapid ? snapid : CEPH_NOSNAP),
+       return rbd_req_sync_op(rbd_dev, NULL,
+                              snapid,
                               CEPH_OSD_OP_READ,
                               CEPH_OSD_FLAG_READ,
                               NULL,
-                              1, obj, ofs, len, buf, NULL, ver);
+                              1, object_name, ofs, len, buf, NULL, ver);
 }
 
 /*
  * Request sync osd watch
  */
-static int rbd_req_sync_notify_ack(struct rbd_device *dev,
+static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
                                   u64 ver,
                                   u64 notify_id,
-                                  const char *obj)
+                                  const char *object_name)
 {
        struct ceph_osd_req_op *ops;
-       struct page **pages = NULL;
        int ret;
 
        ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
        if (ret < 0)
                return ret;
 
-       ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
+       ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
        ops[0].watch.cookie = notify_id;
        ops[0].watch.flag = 0;
 
-       ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
-                         obj, 0, 0, NULL,
-                         pages, 0,
+       ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
+                         object_name, 0, 0, NULL,
+                         NULL, 0,
                          CEPH_OSD_FLAG_READ,
                          ops,
                          1,
@@ -1201,54 +1220,54 @@ static int rbd_req_sync_notify_ack(struct rbd_device *dev,
 
 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 {
-       struct rbd_device *dev = (struct rbd_device *)data;
+       struct rbd_device *rbd_dev = (struct rbd_device *)data;
        int rc;
 
-       if (!dev)
+       if (!rbd_dev)
                return;
 
-       dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
-               notify_id, (int)opcode);
+       dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n",
+               rbd_dev->header_name, notify_id, (int) opcode);
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-       rc = __rbd_update_snaps(dev);
+       rc = __rbd_refresh_header(rbd_dev);
        mutex_unlock(&ctl_mutex);
        if (rc)
-               pr_warning(DRV_NAME "%d got notification but failed to update"
-                          " snaps: %d\n", dev->major, rc);
+               pr_warning(RBD_DRV_NAME "%d got notification but failed to "
+                          " update snaps: %d\n", rbd_dev->major, rc);
 
-       rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
+       rbd_req_sync_notify_ack(rbd_dev, ver, notify_id, rbd_dev->header_name);
 }
 
 /*
  * Request sync osd watch
  */
-static int rbd_req_sync_watch(struct rbd_device *dev,
-                             const char *obj,
+static int rbd_req_sync_watch(struct rbd_device *rbd_dev,
+                             const char *object_name,
                              u64 ver)
 {
        struct ceph_osd_req_op *ops;
-       struct ceph_osd_client *osdc = &dev->client->osdc;
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 
        int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
        if (ret < 0)
                return ret;
 
        ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
-                                    (void *)dev, &dev->watch_event);
+                                    (void *)rbd_dev, &rbd_dev->watch_event);
        if (ret < 0)
                goto fail;
 
        ops[0].watch.ver = cpu_to_le64(ver);
-       ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
+       ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
        ops[0].watch.flag = 1;
 
-       ret = rbd_req_sync_op(dev, NULL,
+       ret = rbd_req_sync_op(rbd_dev, NULL,
                              CEPH_NOSNAP,
                              0,
                              CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
                              ops,
-                             1, obj, 0, 0, NULL,
-                             &dev->watch_request, NULL);
+                             1, object_name, 0, 0, NULL,
+                             &rbd_dev->watch_request, NULL);
 
        if (ret < 0)
                goto fail_event;
@@ -1257,8 +1276,8 @@ static int rbd_req_sync_watch(struct rbd_device *dev,
        return 0;
 
 fail_event:
-       ceph_osdc_cancel_event(dev->watch_event);
-       dev->watch_event = NULL;
+       ceph_osdc_cancel_event(rbd_dev->watch_event);
+       rbd_dev->watch_event = NULL;
 fail:
        rbd_destroy_ops(ops);
        return ret;
@@ -1267,8 +1286,8 @@ fail:
 /*
  * Request sync osd unwatch
  */
-static int rbd_req_sync_unwatch(struct rbd_device *dev,
-                               const char *obj)
+static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev,
+                               const char *object_name)
 {
        struct ceph_osd_req_op *ops;
 
@@ -1277,44 +1296,45 @@ static int rbd_req_sync_unwatch(struct rbd_device *dev,
                return ret;
 
        ops[0].watch.ver = 0;
-       ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
+       ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
        ops[0].watch.flag = 0;
 
-       ret = rbd_req_sync_op(dev, NULL,
+       ret = rbd_req_sync_op(rbd_dev, NULL,
                              CEPH_NOSNAP,
                              0,
                              CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
                              ops,
-                             1, obj, 0, 0, NULL, NULL, NULL);
+                             1, object_name, 0, 0, NULL, NULL, NULL);
 
        rbd_destroy_ops(ops);
-       ceph_osdc_cancel_event(dev->watch_event);
-       dev->watch_event = NULL;
+       ceph_osdc_cancel_event(rbd_dev->watch_event);
+       rbd_dev->watch_event = NULL;
        return ret;
 }
 
 struct rbd_notify_info {
-       struct rbd_device *dev;
+       struct rbd_device *rbd_dev;
 };
 
 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 {
-       struct rbd_device *dev = (struct rbd_device *)data;
-       if (!dev)
+       struct rbd_device *rbd_dev = (struct rbd_device *)data;
+       if (!rbd_dev)
                return;
 
-       dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
+       dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n",
+                               rbd_dev->header_name,
                notify_id, (int)opcode);
 }
 
 /*
  * Request sync osd notify
  */
-static int rbd_req_sync_notify(struct rbd_device *dev,
-                         const char *obj)
+static int rbd_req_sync_notify(struct rbd_device *rbd_dev,
+                         const char *object_name)
 {
        struct ceph_osd_req_op *ops;
-       struct ceph_osd_client *osdc = &dev->client->osdc;
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct ceph_osd_event *event;
        struct rbd_notify_info info;
        int payload_len = sizeof(u32) + sizeof(u32);
@@ -1324,7 +1344,7 @@ static int rbd_req_sync_notify(struct rbd_device *dev,
        if (ret < 0)
                return ret;
 
-       info.dev = dev;
+       info.rbd_dev = rbd_dev;
 
        ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
                                     (void *)&info, &event);
@@ -1337,12 +1357,12 @@ static int rbd_req_sync_notify(struct rbd_device *dev,
        ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
        ops[0].watch.timeout = 12;
 
-       ret = rbd_req_sync_op(dev, NULL,
+       ret = rbd_req_sync_op(rbd_dev, NULL,
                               CEPH_NOSNAP,
                               0,
                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
                               ops,
-                              1, obj, 0, 0, NULL, NULL, NULL);
+                              1, object_name, 0, 0, NULL, NULL, NULL);
        if (ret < 0)
                goto fail_event;
 
@@ -1361,36 +1381,36 @@ fail:
 /*
  * Request sync osd read
  */
-static int rbd_req_sync_exec(struct rbd_device *dev,
-                            const char *obj,
-                            const char *cls,
-                            const char *method,
+static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
+                            const char *object_name,
+                            const char *class_name,
+                            const char *method_name,
                             const char *data,
                             int len,
                             u64 *ver)
 {
        struct ceph_osd_req_op *ops;
-       int cls_len = strlen(cls);
-       int method_len = strlen(method);
+       int class_name_len = strlen(class_name);
+       int method_name_len = strlen(method_name);
        int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
-                                   cls_len + method_len + len);
+                                   class_name_len + method_name_len + len);
        if (ret < 0)
                return ret;
 
-       ops[0].cls.class_name = cls;
-       ops[0].cls.class_len = (__u8)cls_len;
-       ops[0].cls.method_name = method;
-       ops[0].cls.method_len = (__u8)method_len;
+       ops[0].cls.class_name = class_name;
+       ops[0].cls.class_len = (__u8) class_name_len;
+       ops[0].cls.method_name = method_name;
+       ops[0].cls.method_len = (__u8) method_name_len;
        ops[0].cls.argc = 0;
        ops[0].cls.indata = data;
        ops[0].cls.indata_len = len;
 
-       ret = rbd_req_sync_op(dev, NULL,
+       ret = rbd_req_sync_op(rbd_dev, NULL,
                               CEPH_NOSNAP,
                               0,
                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
                               ops,
-                              1, obj, 0, 0, NULL, NULL, ver);
+                              1, object_name, 0, 0, NULL, NULL, ver);
 
        rbd_destroy_ops(ops);
 
@@ -1421,9 +1441,7 @@ static void rbd_rq_fn(struct request_queue *q)
        struct request *rq;
        struct bio_pair *bp = NULL;
 
-       rq = blk_fetch_request(q);
-
-       while (1) {
+       while ((rq = blk_fetch_request(q))) {
                struct bio *bio;
                struct bio *rq_bio, *next_bio = NULL;
                bool do_write;
@@ -1441,39 +1459,39 @@ static void rbd_rq_fn(struct request_queue *q)
                /* filter out block requests we don't understand */
                if ((rq->cmd_type != REQ_TYPE_FS)) {
                        __blk_end_request_all(rq, 0);
-                       goto next;
+                       continue;
                }
 
                /* deduce our operation (read, write) */
                do_write = (rq_data_dir(rq) == WRITE);
 
                size = blk_rq_bytes(rq);
-               ofs = blk_rq_pos(rq) * 512ULL;
+               ofs = blk_rq_pos(rq) * SECTOR_SIZE;
                rq_bio = rq->bio;
                if (do_write && rbd_dev->read_only) {
                        __blk_end_request_all(rq, -EROFS);
-                       goto next;
+                       continue;
                }
 
                spin_unlock_irq(q->queue_lock);
 
                dout("%s 0x%x bytes at 0x%llx\n",
                     do_write ? "write" : "read",
-                    size, blk_rq_pos(rq) * 512ULL);
+                    size, blk_rq_pos(rq) * SECTOR_SIZE);
 
                num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
                coll = rbd_alloc_coll(num_segs);
                if (!coll) {
                        spin_lock_irq(q->queue_lock);
                        __blk_end_request_all(rq, -ENOMEM);
-                       goto next;
+                       continue;
                }
 
                do {
                        /* a bio clone to be passed down to OSD req */
                        dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
                        op_size = rbd_get_segment(&rbd_dev->header,
-                                                 rbd_dev->header.block_name,
+                                                 rbd_dev->header.object_prefix,
                                                  ofs, size,
                                                  NULL, NULL);
                        kref_get(&coll->kref);
@@ -1495,7 +1513,7 @@ static void rbd_rq_fn(struct request_queue *q)
                                              coll, cur_seg);
                        else
                                rbd_req_read(rq, rbd_dev,
-                                            cur_snap_id(rbd_dev),
+                                            rbd_dev->snap_id,
                                             ofs,
                                             op_size, bio,
                                             coll, cur_seg);
@@ -1512,8 +1530,6 @@ next_seg:
                if (bp)
                        bio_pair_release(bp);
                spin_lock_irq(q->queue_lock);
-next:
-               rq = blk_fetch_request(q);
        }
 }
 
@@ -1526,13 +1542,17 @@ static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
                          struct bio_vec *bvec)
 {
        struct rbd_device *rbd_dev = q->queuedata;
-       unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
-       sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
-       unsigned int bio_sectors = bmd->bi_size >> 9;
+       unsigned int chunk_sectors;
+       sector_t sector;
+       unsigned int bio_sectors;
        int max;
 
+       chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
+       sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
+       bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
+
        max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
-                                + bio_sectors)) << 9;
+                                + bio_sectors)) << SECTOR_SHIFT;
        if (max < 0)
                max = 0; /* bio_add cannot handle a negative return */
        if (max <= bvec->bv_len && bio_sectors == 0)
@@ -1564,23 +1584,24 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
 {
        ssize_t rc;
        struct rbd_image_header_ondisk *dh;
-       int snap_count = 0;
-       u64 snap_names_len = 0;
+       u32 snap_count = 0;
        u64 ver;
+       size_t len;
 
+       /*
+        * First reads the fixed-size header to determine the number
+        * of snapshots, then re-reads it, along with all snapshot
+        * records as well as their stored names.
+        */
+       len = sizeof (*dh);
        while (1) {
-               int len = sizeof(*dh) +
-                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
-                         snap_names_len;
-
-               rc = -ENOMEM;
                dh = kmalloc(len, GFP_KERNEL);
                if (!dh)
                        return -ENOMEM;
 
                rc = rbd_req_sync_read(rbd_dev,
                                       NULL, CEPH_NOSNAP,
-                                      rbd_dev->obj_md_name,
+                                      rbd_dev->header_name,
                                       0, len,
                                       (char *)dh, &ver);
                if (rc < 0)
@@ -1588,21 +1609,23 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
 
                rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
                if (rc < 0) {
-                       if (rc == -ENXIO) {
+                       if (rc == -ENXIO)
                                pr_warning("unrecognized header format"
-                                          " for image %s", rbd_dev->obj);
-                       }
+                                          " for image %s\n",
+                                          rbd_dev->image_name);
                        goto out_dh;
                }
 
-               if (snap_count != header->total_snaps) {
-                       snap_count = header->total_snaps;
-                       snap_names_len = header->snap_names_len;
-                       rbd_header_free(header);
-                       kfree(dh);
-                       continue;
-               }
-               break;
+               if (snap_count == header->total_snaps)
+                       break;
+
+               snap_count = header->total_snaps;
+               len = sizeof (*dh) +
+                       snap_count * sizeof(struct rbd_image_snap_ondisk) +
+                       header->snap_names_len;
+
+               rbd_header_free(header);
+               kfree(dh);
        }
        header->obj_version = ver;
 
@@ -1614,7 +1637,7 @@ out_dh:
 /*
  * create a snapshot
  */
-static int rbd_header_add_snap(struct rbd_device *dev,
+static int rbd_header_add_snap(struct rbd_device *rbd_dev,
                               const char *snap_name,
                               gfp_t gfp_flags)
 {
@@ -1623,13 +1646,14 @@ static int rbd_header_add_snap(struct rbd_device *dev,
        int ret;
        void *data, *p, *e;
        u64 ver;
+       struct ceph_mon_client *monc;
 
        /* we should create a snapshot only if we're pointing at the head */
-       if (dev->cur_snap)
+       if (rbd_dev->snap_id != CEPH_NOSNAP)
                return -EINVAL;
 
-       ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
-                                     &new_snapid);
+       monc = &rbd_dev->rbd_client->client->monc;
+       ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
        dout("created snapid=%lld\n", new_snapid);
        if (ret < 0)
                return ret;
@@ -1644,7 +1668,8 @@ static int rbd_header_add_snap(struct rbd_device *dev,
        ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
        ceph_encode_64_safe(&p, e, new_snapid, bad);
 
-       ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "snap_add",
                                data, p - data, &ver);
 
        kfree(data);
@@ -1652,7 +1677,9 @@ static int rbd_header_add_snap(struct rbd_device *dev,
        if (ret < 0)
                return ret;
 
-       dev->header.snapc->seq =  new_snapid;
+       down_write(&rbd_dev->header_rwsem);
+       rbd_dev->header.snapc->seq = new_snapid;
+       up_write(&rbd_dev->header_rwsem);
 
        return 0;
 bad:
@@ -1672,7 +1699,7 @@ static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
 /*
  * only read the first part of the ondisk header, without the snaps info
  */
-static int __rbd_update_snaps(struct rbd_device *rbd_dev)
+static int __rbd_refresh_header(struct rbd_device *rbd_dev)
 {
        int ret;
        struct rbd_image_header h;
@@ -1684,9 +1711,9 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
                return ret;
 
        /* resized? */
-       set_capacity(rbd_dev->disk, h.image_size / 512ULL);
+       set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
 
-       down_write(&rbd_dev->header.snap_rwsem);
+       down_write(&rbd_dev->header_rwsem);
 
        snap_seq = rbd_dev->header.snapc->seq;
        if (rbd_dev->header.total_snaps &&
@@ -1695,15 +1722,20 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
                   if head moves */
                follow_seq = 1;
 
-       kfree(rbd_dev->header.snapc);
-       kfree(rbd_dev->header.snap_names);
+       /* rbd_dev->header.object_prefix shouldn't change */
        kfree(rbd_dev->header.snap_sizes);
+       kfree(rbd_dev->header.snap_names);
+       kfree(rbd_dev->header.snapc);
 
        rbd_dev->header.total_snaps = h.total_snaps;
        rbd_dev->header.snapc = h.snapc;
        rbd_dev->header.snap_names = h.snap_names;
        rbd_dev->header.snap_names_len = h.snap_names_len;
        rbd_dev->header.snap_sizes = h.snap_sizes;
+       /* Free the extra copy of the object prefix */
+       WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
+       kfree(h.object_prefix);
+
        if (follow_seq)
                rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
        else
@@ -1711,7 +1743,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
 
        ret = __rbd_init_snaps_header(rbd_dev);
 
-       up_write(&rbd_dev->header.snap_rwsem);
+       up_write(&rbd_dev->header_rwsem);
 
        return ret;
 }
@@ -1721,6 +1753,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        struct gendisk *disk;
        struct request_queue *q;
        int rc;
+       u64 segment_size;
        u64 total_size = 0;
 
        /* contact OSD, request size info about the object being mapped */
@@ -1733,7 +1766,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        if (rc)
                return rc;
 
-       rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
+       rc = rbd_header_set_snap(rbd_dev, &total_size);
        if (rc)
                return rc;
 
@@ -1743,7 +1776,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        if (!disk)
                goto out;
 
-       snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
+       snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
                 rbd_dev->id);
        disk->major = rbd_dev->major;
        disk->first_minor = 0;
@@ -1756,11 +1789,15 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        if (!q)
                goto out_disk;
 
+       /* We use the default size, but let's be explicit about it. */
+       blk_queue_physical_block_size(q, SECTOR_SIZE);
+
        /* set io sizes to object size */
-       blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
-       blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
-       blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
-       blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
+       segment_size = rbd_obj_bytes(&rbd_dev->header);
+       blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
+       blk_queue_max_segment_size(q, segment_size);
+       blk_queue_io_min(q, segment_size);
+       blk_queue_io_opt(q, segment_size);
 
        blk_queue_merge_bvec(q, rbd_merge_bvec);
        disk->queue = q;
@@ -1771,7 +1808,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        rbd_dev->q = q;
 
        /* finally, announce the disk to the world */
-       set_capacity(disk, total_size / 512ULL);
+       set_capacity(disk, total_size / SECTOR_SIZE);
        add_disk(disk);
 
        pr_info("%s: added with size 0x%llx\n",
@@ -1788,10 +1825,15 @@ out:
   sysfs
 */
 
+static struct rbd_device *dev_to_rbd_dev(struct device *dev)
+{
+       return container_of(dev, struct rbd_device, dev);
+}
+
 static ssize_t rbd_size_show(struct device *dev,
                             struct device_attribute *attr, char *buf)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
 }
@@ -1799,7 +1841,7 @@ static ssize_t rbd_size_show(struct device *dev,
 static ssize_t rbd_major_show(struct device *dev,
                              struct device_attribute *attr, char *buf)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "%d\n", rbd_dev->major);
 }
@@ -1807,32 +1849,41 @@ static ssize_t rbd_major_show(struct device *dev,
 static ssize_t rbd_client_id_show(struct device *dev,
                                  struct device_attribute *attr, char *buf)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
+       return sprintf(buf, "client%lld\n",
+                       ceph_client_id(rbd_dev->rbd_client->client));
 }
 
 static ssize_t rbd_pool_show(struct device *dev,
                             struct device_attribute *attr, char *buf)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "%s\n", rbd_dev->pool_name);
 }
 
+static ssize_t rbd_pool_id_show(struct device *dev,
+                            struct device_attribute *attr, char *buf)
+{
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+       return sprintf(buf, "%d\n", rbd_dev->pool_id);
+}
+
 static ssize_t rbd_name_show(struct device *dev,
                             struct device_attribute *attr, char *buf)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%s\n", rbd_dev->obj);
+       return sprintf(buf, "%s\n", rbd_dev->image_name);
 }
 
 static ssize_t rbd_snap_show(struct device *dev,
                             struct device_attribute *attr,
                             char *buf)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
        return sprintf(buf, "%s\n", rbd_dev->snap_name);
 }
@@ -1842,13 +1893,13 @@ static ssize_t rbd_image_refresh(struct device *dev,
                                 const char *buf,
                                 size_t size)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
        int rc;
        int ret = size;
 
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
 
-       rc = __rbd_update_snaps(rbd_dev);
+       rc = __rbd_refresh_header(rbd_dev);
        if (rc < 0)
                ret = rc;
 
@@ -1860,6 +1911,7 @@ static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
+static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
@@ -1870,6 +1922,7 @@ static struct attribute *rbd_attrs[] = {
        &dev_attr_major.attr,
        &dev_attr_client_id.attr,
        &dev_attr_pool.attr,
+       &dev_attr_pool_id.attr,
        &dev_attr_name.attr,
        &dev_attr_current_snap.attr,
        &dev_attr_refresh.attr,
@@ -1907,7 +1960,7 @@ static ssize_t rbd_snap_size_show(struct device *dev,
 {
        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
 
-       return sprintf(buf, "%lld\n", (long long)snap->size);
+       return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
 }
 
 static ssize_t rbd_snap_id_show(struct device *dev,
@@ -1916,7 +1969,7 @@ static ssize_t rbd_snap_id_show(struct device *dev,
 {
        struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
 
-       return sprintf(buf, "%lld\n", (long long)snap->id);
+       return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
 }
 
 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
@@ -2088,19 +2141,9 @@ static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
        return 0;
 }
 
-
-static void rbd_root_dev_release(struct device *dev)
-{
-}
-
-static struct device rbd_root_dev = {
-       .init_name =    "rbd",
-       .release =      rbd_root_dev_release,
-};
-
 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
 {
-       int ret = -ENOMEM;
+       int ret;
        struct device *dev;
        struct rbd_snap *snap;
 
@@ -2114,7 +2157,7 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
        dev_set_name(dev, "%d", rbd_dev->id);
        ret = device_register(dev);
        if (ret < 0)
-               goto done_free;
+               goto out;
 
        list_for_each_entry(snap, &rbd_dev->snaps, node) {
                ret = rbd_register_snap_dev(rbd_dev, snap,
@@ -2122,10 +2165,7 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
                if (ret < 0)
                        break;
        }
-
-       mutex_unlock(&ctl_mutex);
-       return 0;
-done_free:
+out:
        mutex_unlock(&ctl_mutex);
        return ret;
 }
@@ -2140,11 +2180,11 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
        int ret, rc;
 
        do {
-               ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
+               ret = rbd_req_sync_watch(rbd_dev, rbd_dev->header_name,
                                         rbd_dev->header.obj_version);
                if (ret == -ERANGE) {
                        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-                       rc = __rbd_update_snaps(rbd_dev);
+                       rc = __rbd_refresh_header(rbd_dev);
                        mutex_unlock(&ctl_mutex);
                        if (rc < 0)
                                return rc;
@@ -2154,104 +2194,307 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
        return ret;
 }
 
+static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
+
+/*
+ * Get a unique rbd identifier for the given new rbd_dev, and add
+ * the rbd_dev to the global list.  The minimum rbd id is 1.
+ */
+static void rbd_id_get(struct rbd_device *rbd_dev)
+{
+       rbd_dev->id = atomic64_inc_return(&rbd_id_max);
+
+       spin_lock(&rbd_dev_list_lock);
+       list_add_tail(&rbd_dev->node, &rbd_dev_list);
+       spin_unlock(&rbd_dev_list_lock);
+}
+
+/*
+ * Remove an rbd_dev from the global list, and record that its
+ * identifier is no longer in use.
+ */
+static void rbd_id_put(struct rbd_device *rbd_dev)
+{
+       struct list_head *tmp;
+       int rbd_id = rbd_dev->id;
+       int max_id;
+
+       BUG_ON(rbd_id < 1);
+
+       spin_lock(&rbd_dev_list_lock);
+       list_del_init(&rbd_dev->node);
+
+       /*
+        * If the id being "put" is not the current maximum, there
+        * is nothing special we need to do.
+        */
+       if (rbd_id != atomic64_read(&rbd_id_max)) {
+               spin_unlock(&rbd_dev_list_lock);
+               return;
+       }
+
+       /*
+        * We need to update the current maximum id.  Search the
+        * list to find out what it is.  We're more likely to find
+        * the maximum at the end, so search the list backward.
+        */
+       max_id = 0;
+       list_for_each_prev(tmp, &rbd_dev_list) {
+               struct rbd_device *rbd_dev;
+
+               rbd_dev = list_entry(tmp, struct rbd_device, node);
+               if (rbd_id > max_id)
+                       max_id = rbd_id;
+       }
+       spin_unlock(&rbd_dev_list_lock);
+
+       /*
+        * The max id could have been updated by rbd_id_get(), in
+        * which case it now accurately reflects the new maximum.
+        * Be careful not to overwrite the maximum value in that
+        * case.
+        */
+       atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
+}
+
+/*
+ * Skips over white space at *buf, and updates *buf to point to the
+ * first found non-space character (if any). Returns the length of
+ * the token (string of non-white space characters) found.  Note
+ * that *buf must be terminated with '\0'.
+ */
+static inline size_t next_token(const char **buf)
+{
+        /*
+        * These are the characters that produce nonzero for
+        * isspace() in the "C" and "POSIX" locales.
+        */
+        const char *spaces = " \f\n\r\t\v";
+
+        *buf += strspn(*buf, spaces);  /* Find start of token */
+
+       return strcspn(*buf, spaces);   /* Return token length */
+}
+
+/*
+ * Finds the next token in *buf, and if the provided token buffer is
+ * big enough, copies the found token into it.  The result, if
+ * copied, is guaranteed to be terminated with '\0'.  Note that *buf
+ * must be terminated with '\0' on entry.
+ *
+ * Returns the length of the token found (not including the '\0').
+ * Return value will be 0 if no token is found, and it will be >=
+ * token_size if the token would not fit.
+ *
+ * The *buf pointer will be updated to point beyond the end of the
+ * found token.  Note that this occurs even if the token buffer is
+ * too small to hold it.
+ */
+static inline size_t copy_token(const char **buf,
+                               char *token,
+                               size_t token_size)
+{
+        size_t len;
+
+       len = next_token(buf);
+       if (len < token_size) {
+               memcpy(token, *buf, len);
+               *(token + len) = '\0';
+       }
+       *buf += len;
+
+        return len;
+}
+
+/*
+ * Finds the next token in *buf, dynamically allocates a buffer big
+ * enough to hold a copy of it, and copies the token into the new
+ * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
+ * that a duplicate buffer is created even for a zero-length token.
+ *
+ * Returns a pointer to the newly-allocated duplicate, or a null
+ * pointer if memory for the duplicate was not available.  If
+ * the lenp argument is a non-null pointer, the length of the token
+ * (not including the '\0') is returned in *lenp.
+ *
+ * If successful, the *buf pointer will be updated to point beyond
+ * the end of the found token.
+ *
+ * Note: uses GFP_KERNEL for allocation.
+ */
+static inline char *dup_token(const char **buf, size_t *lenp)
+{
+       char *dup;
+       size_t len;
+
+       len = next_token(buf);
+       dup = kmalloc(len + 1, GFP_KERNEL);
+       if (!dup)
+               return NULL;
+
+       memcpy(dup, *buf, len);
+       *(dup + len) = '\0';
+       *buf += len;
+
+       if (lenp)
+               *lenp = len;
+
+       return dup;
+}
+
+/*
+ * This fills in the pool_name, image_name, image_name_len, snap_name,
+ * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
+ * on the list of monitor addresses and other options provided via
+ * /sys/bus/rbd/add.
+ *
+ * Note: rbd_dev is assumed to have been initially zero-filled.
+ */
+static int rbd_add_parse_args(struct rbd_device *rbd_dev,
+                             const char *buf,
+                             const char **mon_addrs,
+                             size_t *mon_addrs_size,
+                             char *options,
+                            size_t options_size)
+{
+       size_t len;
+       int ret;
+
+       /* The first four tokens are required */
+
+       len = next_token(&buf);
+       if (!len)
+               return -EINVAL;
+       *mon_addrs_size = len + 1;
+       *mon_addrs = buf;
+
+       buf += len;
+
+       len = copy_token(&buf, options, options_size);
+       if (!len || len >= options_size)
+               return -EINVAL;
+
+       ret = -ENOMEM;
+       rbd_dev->pool_name = dup_token(&buf, NULL);
+       if (!rbd_dev->pool_name)
+               goto out_err;
+
+       rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
+       if (!rbd_dev->image_name)
+               goto out_err;
+
+       /* Create the name of the header object */
+
+       rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
+                                               + sizeof (RBD_SUFFIX),
+                                       GFP_KERNEL);
+       if (!rbd_dev->header_name)
+               goto out_err;
+       sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
+
+       /*
+        * The snapshot name is optional.  If none is is supplied,
+        * we use the default value.
+        */
+       rbd_dev->snap_name = dup_token(&buf, &len);
+       if (!rbd_dev->snap_name)
+               goto out_err;
+       if (!len) {
+               /* Replace the empty name with the default */
+               kfree(rbd_dev->snap_name);
+               rbd_dev->snap_name
+                       = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
+               if (!rbd_dev->snap_name)
+                       goto out_err;
+
+               memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
+                       sizeof (RBD_SNAP_HEAD_NAME));
+       }
+
+       return 0;
+
+out_err:
+       kfree(rbd_dev->header_name);
+       kfree(rbd_dev->image_name);
+       kfree(rbd_dev->pool_name);
+       rbd_dev->pool_name = NULL;
+
+       return ret;
+}
+
 static ssize_t rbd_add(struct bus_type *bus,
                       const char *buf,
                       size_t count)
 {
-       struct ceph_osd_client *osdc;
-       struct rbd_device *rbd_dev;
-       ssize_t rc = -ENOMEM;
-       int irc, new_id = 0;
-       struct list_head *tmp;
-       char *mon_dev_name;
        char *options;
+       struct rbd_device *rbd_dev = NULL;
+       const char *mon_addrs = NULL;
+       size_t mon_addrs_size = 0;
+       struct ceph_osd_client *osdc;
+       int rc = -ENOMEM;
 
        if (!try_module_get(THIS_MODULE))
                return -ENODEV;
 
-       mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
-       if (!mon_dev_name)
-               goto err_out_mod;
-
-       options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
+       options = kmalloc(count, GFP_KERNEL);
        if (!options)
-               goto err_mon_dev;
-
-       /* new rbd_device object */
+               goto err_nomem;
        rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
        if (!rbd_dev)
-               goto err_out_opt;
+               goto err_nomem;
 
        /* static rbd_device initialization */
        spin_lock_init(&rbd_dev->lock);
        INIT_LIST_HEAD(&rbd_dev->node);
        INIT_LIST_HEAD(&rbd_dev->snaps);
+       init_rwsem(&rbd_dev->header_rwsem);
 
-       init_rwsem(&rbd_dev->header.snap_rwsem);
+       init_rwsem(&rbd_dev->header_rwsem);
 
        /* generate unique id: find highest unique id, add one */
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-
-       list_for_each(tmp, &rbd_dev_list) {
-               struct rbd_device *rbd_dev;
-
-               rbd_dev = list_entry(tmp, struct rbd_device, node);
-               if (rbd_dev->id >= new_id)
-                       new_id = rbd_dev->id + 1;
-       }
+       rbd_id_get(rbd_dev);
 
-       rbd_dev->id = new_id;
-
-       /* add to global list */
-       list_add_tail(&rbd_dev->node, &rbd_dev_list);
+       /* Fill in the device name, now that we have its id. */
+       BUILD_BUG_ON(DEV_NAME_LEN
+                       < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
+       sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
 
        /* parse add command */
-       if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
-                  "%" __stringify(RBD_MAX_OPT_LEN) "s "
-                  "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
-                  "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
-                  "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
-                  mon_dev_name, options, rbd_dev->pool_name,
-                  rbd_dev->obj, rbd_dev->snap_name) < 4) {
-               rc = -EINVAL;
-               goto err_out_slot;
-       }
-
-       if (rbd_dev->snap_name[0] == 0)
-               rbd_dev->snap_name[0] = '-';
-
-       rbd_dev->obj_len = strlen(rbd_dev->obj);
-       snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
-                rbd_dev->obj, RBD_SUFFIX);
-
-       /* initialize rest of new object */
-       snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
-       rc = rbd_get_client(rbd_dev, mon_dev_name, options);
-       if (rc < 0)
-               goto err_out_slot;
+       rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
+                               options, count);
+       if (rc)
+               goto err_put_id;
 
-       mutex_unlock(&ctl_mutex);
+       rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
+                                               options);
+       if (IS_ERR(rbd_dev->rbd_client)) {
+               rc = PTR_ERR(rbd_dev->rbd_client);
+               goto err_put_id;
+       }
 
        /* pick the pool */
-       osdc = &rbd_dev->client->osdc;
+       osdc = &rbd_dev->rbd_client->client->osdc;
        rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
        if (rc < 0)
                goto err_out_client;
-       rbd_dev->poolid = rc;
+       rbd_dev->pool_id = rc;
 
        /* register our block device */
-       irc = register_blkdev(0, rbd_dev->name);
-       if (irc < 0) {
-               rc = irc;
+       rc = register_blkdev(0, rbd_dev->name);
+       if (rc < 0)
                goto err_out_client;
-       }
-       rbd_dev->major = irc;
+       rbd_dev->major = rc;
 
        rc = rbd_bus_add_dev(rbd_dev);
        if (rc)
                goto err_out_blkdev;
 
-       /* set up and announce blkdev mapping */
+       /*
+        * At this point cleanup in the event of an error is the job
+        * of the sysfs code (initiated by rbd_bus_del_dev()).
+        *
+        * Set up and announce blkdev mapping.
+        */
        rc = rbd_init_disk(rbd_dev);
        if (rc)
                goto err_out_bus;
@@ -2263,35 +2506,32 @@ static ssize_t rbd_add(struct bus_type *bus,
        return count;
 
 err_out_bus:
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-       list_del_init(&rbd_dev->node);
-       mutex_unlock(&ctl_mutex);
-
        /* this will also clean up rest of rbd_dev stuff */
 
        rbd_bus_del_dev(rbd_dev);
        kfree(options);
-       kfree(mon_dev_name);
        return rc;
 
 err_out_blkdev:
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
 err_out_client:
        rbd_put_client(rbd_dev);
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-err_out_slot:
-       list_del_init(&rbd_dev->node);
-       mutex_unlock(&ctl_mutex);
-
+err_put_id:
+       if (rbd_dev->pool_name) {
+               kfree(rbd_dev->snap_name);
+               kfree(rbd_dev->header_name);
+               kfree(rbd_dev->image_name);
+               kfree(rbd_dev->pool_name);
+       }
+       rbd_id_put(rbd_dev);
+err_nomem:
        kfree(rbd_dev);
-err_out_opt:
        kfree(options);
-err_mon_dev:
-       kfree(mon_dev_name);
-err_out_mod:
+
        dout("Error adding device %s\n", buf);
        module_put(THIS_MODULE);
-       return rc;
+
+       return (ssize_t) rc;
 }
 
 static struct rbd_device *__rbd_get_dev(unsigned long id)
@@ -2299,30 +2539,43 @@ static struct rbd_device *__rbd_get_dev(unsigned long id)
        struct list_head *tmp;
        struct rbd_device *rbd_dev;
 
+       spin_lock(&rbd_dev_list_lock);
        list_for_each(tmp, &rbd_dev_list) {
                rbd_dev = list_entry(tmp, struct rbd_device, node);
-               if (rbd_dev->id == id)
+               if (rbd_dev->id == id) {
+                       spin_unlock(&rbd_dev_list_lock);
                        return rbd_dev;
+               }
        }
+       spin_unlock(&rbd_dev_list_lock);
        return NULL;
 }
 
 static void rbd_dev_release(struct device *dev)
 {
-       struct rbd_device *rbd_dev =
-                       container_of(dev, struct rbd_device, dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+       if (rbd_dev->watch_request) {
+               struct ceph_client *client = rbd_dev->rbd_client->client;
 
-       if (rbd_dev->watch_request)
-               ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
+               ceph_osdc_unregister_linger_request(&client->osdc,
                                                    rbd_dev->watch_request);
+       }
        if (rbd_dev->watch_event)
-               rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
+               rbd_req_sync_unwatch(rbd_dev, rbd_dev->header_name);
 
        rbd_put_client(rbd_dev);
 
        /* clean up and free blkdev */
        rbd_free_disk(rbd_dev);
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
+
+       /* done with the id, and with the rbd_dev */
+       kfree(rbd_dev->snap_name);
+       kfree(rbd_dev->header_name);
+       kfree(rbd_dev->pool_name);
+       kfree(rbd_dev->image_name);
+       rbd_id_put(rbd_dev);
        kfree(rbd_dev);
 
        /* release module ref */
@@ -2355,8 +2608,6 @@ static ssize_t rbd_remove(struct bus_type *bus,
                goto done;
        }
 
-       list_del_init(&rbd_dev->node);
-
        __rbd_remove_all_snaps(rbd_dev);
        rbd_bus_del_dev(rbd_dev);
 
@@ -2370,7 +2621,7 @@ static ssize_t rbd_snap_add(struct device *dev,
                            const char *buf,
                            size_t count)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd(dev);
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
        int ret;
        char *name = kmalloc(count + 1, GFP_KERNEL);
        if (!name)
@@ -2385,7 +2636,7 @@ static ssize_t rbd_snap_add(struct device *dev,
        if (ret < 0)
                goto err_unlock;
 
-       ret = __rbd_update_snaps(rbd_dev);
+       ret = __rbd_refresh_header(rbd_dev);
        if (ret < 0)
                goto err_unlock;
 
@@ -2394,7 +2645,7 @@ static ssize_t rbd_snap_add(struct device *dev,
        mutex_unlock(&ctl_mutex);
 
        /* make a best effort, don't error if failed */
-       rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
+       rbd_req_sync_notify(rbd_dev, rbd_dev->header_name);
 
        ret = count;
        kfree(name);
@@ -2406,12 +2657,6 @@ err_unlock:
        return ret;
 }
 
-static struct bus_attribute rbd_bus_attrs[] = {
-       __ATTR(add, S_IWUSR, NULL, rbd_add),
-       __ATTR(remove, S_IWUSR, NULL, rbd_remove),
-       __ATTR_NULL
-};
-
 /*
  * create control files in sysfs
  * /sys/bus/rbd/...
@@ -2420,21 +2665,21 @@ static int rbd_sysfs_init(void)
 {
        int ret;
 
-       rbd_bus_type.bus_attrs = rbd_bus_attrs;
-
-       ret = bus_register(&rbd_bus_type);
-        if (ret < 0)
+       ret = device_register(&rbd_root_dev);
+       if (ret < 0)
                return ret;
 
-       ret = device_register(&rbd_root_dev);
+       ret = bus_register(&rbd_bus_type);
+       if (ret < 0)
+               device_unregister(&rbd_root_dev);
 
        return ret;
 }
 
 static void rbd_sysfs_cleanup(void)
 {
-       device_unregister(&rbd_root_dev);
        bus_unregister(&rbd_bus_type);
+       device_unregister(&rbd_root_dev);
 }
 
 int __init rbd_init(void)
@@ -2444,8 +2689,7 @@ int __init rbd_init(void)
        rc = rbd_sysfs_init();
        if (rc)
                return rc;
-       spin_lock_init(&node_lock);
-       pr_info("loaded " DRV_NAME_LONG "\n");
+       pr_info("loaded " RBD_DRV_NAME_LONG "\n");
        return 0;
 }