2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
61 #define RBD_SNAP_HEAD_NAME "-"
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 * block device image metadata (in-memory version)
77 struct rbd_image_header {
83 struct ceph_snap_context *snapc;
84 size_t snap_names_len;
99 * an instance of the client. multiple devices may share an rbd client.
102 struct ceph_client *client;
103 struct rbd_options *rbd_opts;
105 struct list_head node;
109 * a request completion status
111 struct rbd_req_status {
118 * a collection of requests
120 struct rbd_req_coll {
124 struct rbd_req_status status[0];
128 * a single io request
131 struct request *rq; /* blk layer request */
132 struct bio *bio; /* cloned bio */
133 struct page **pages; /* list of used pages */
136 struct rbd_req_coll *coll;
143 struct list_head node;
151 int id; /* blkdev unique id */
153 int major; /* blkdev assigned major */
154 struct gendisk *disk; /* blkdev's gendisk and rq */
155 struct request_queue *q;
157 struct rbd_client *rbd_client;
159 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
161 spinlock_t lock; /* queue lock */
163 struct rbd_image_header header;
164 char *obj; /* rbd image name */
166 char *obj_md_name; /* hdr nm. */
170 struct ceph_osd_event *watch_event;
171 struct ceph_osd_request *watch_request;
173 /* protects updating the header */
174 struct rw_semaphore header_rwsem;
175 char snap_name[RBD_MAX_SNAP_NAME_LEN];
176 u64 snap_id; /* current snapshot id */
179 struct list_head node;
181 /* list of snapshots */
182 struct list_head snaps;
188 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
190 static LIST_HEAD(rbd_dev_list); /* devices */
191 static DEFINE_SPINLOCK(rbd_dev_list_lock);
193 static LIST_HEAD(rbd_client_list); /* clients */
194 static DEFINE_SPINLOCK(rbd_client_list_lock);
196 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
197 static void rbd_dev_release(struct device *dev);
198 static ssize_t rbd_snap_add(struct device *dev,
199 struct device_attribute *attr,
202 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
203 struct rbd_snap *snap);
205 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
207 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210 static struct bus_attribute rbd_bus_attrs[] = {
211 __ATTR(add, S_IWUSR, NULL, rbd_add),
212 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
216 static struct bus_type rbd_bus_type = {
218 .bus_attrs = rbd_bus_attrs,
221 static void rbd_root_dev_release(struct device *dev)
225 static struct device rbd_root_dev = {
227 .release = rbd_root_dev_release,
231 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
233 return get_device(&rbd_dev->dev);
236 static void rbd_put_dev(struct rbd_device *rbd_dev)
238 put_device(&rbd_dev->dev);
241 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
243 static int rbd_open(struct block_device *bdev, fmode_t mode)
245 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
247 rbd_get_dev(rbd_dev);
249 set_device_ro(bdev, rbd_dev->read_only);
251 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
257 static int rbd_release(struct gendisk *disk, fmode_t mode)
259 struct rbd_device *rbd_dev = disk->private_data;
261 rbd_put_dev(rbd_dev);
266 static const struct block_device_operations rbd_bd_ops = {
267 .owner = THIS_MODULE,
269 .release = rbd_release,
273 * Initialize an rbd client instance.
276 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
277 struct rbd_options *rbd_opts)
279 struct rbd_client *rbdc;
282 dout("rbd_client_create\n");
283 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
287 kref_init(&rbdc->kref);
288 INIT_LIST_HEAD(&rbdc->node);
290 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
293 if (IS_ERR(rbdc->client))
295 opt = NULL; /* Now rbdc->client is responsible for opt */
297 ret = ceph_open_session(rbdc->client);
301 rbdc->rbd_opts = rbd_opts;
303 spin_lock(&rbd_client_list_lock);
304 list_add_tail(&rbdc->node, &rbd_client_list);
305 spin_unlock(&rbd_client_list_lock);
307 mutex_unlock(&ctl_mutex);
309 dout("rbd_client_create created %p\n", rbdc);
313 ceph_destroy_client(rbdc->client);
315 mutex_unlock(&ctl_mutex);
319 ceph_destroy_options(opt);
324 * Find a ceph client with specific addr and configuration.
326 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
328 struct rbd_client *client_node;
330 if (opt->flags & CEPH_OPT_NOSHARE)
333 list_for_each_entry(client_node, &rbd_client_list, node)
334 if (ceph_compare_options(opt, client_node->client) == 0)
347 /* string args above */
350 static match_table_t rbdopt_tokens = {
351 {Opt_notify_timeout, "notify_timeout=%d"},
353 /* string args above */
357 static int parse_rbd_opts_token(char *c, void *private)
359 struct rbd_options *rbdopt = private;
360 substring_t argstr[MAX_OPT_ARGS];
361 int token, intval, ret;
363 token = match_token(c, rbdopt_tokens, argstr);
367 if (token < Opt_last_int) {
368 ret = match_int(&argstr[0], &intval);
370 pr_err("bad mount option arg (not int) "
374 dout("got int token %d val %d\n", token, intval);
375 } else if (token > Opt_last_int && token < Opt_last_string) {
376 dout("got string token %d val %s\n", token,
379 dout("got token %d\n", token);
383 case Opt_notify_timeout:
384 rbdopt->notify_timeout = intval;
393 * Get a ceph client with specific addr and configuration, if one does
394 * not exist create it.
396 static struct rbd_client *rbd_get_client(const char *mon_addr,
400 struct rbd_client *rbdc;
401 struct ceph_options *opt;
402 struct rbd_options *rbd_opts;
404 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
406 return ERR_PTR(-ENOMEM);
408 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
410 opt = ceph_parse_options(options, mon_addr,
411 mon_addr + mon_addr_len,
412 parse_rbd_opts_token, rbd_opts);
415 return ERR_CAST(opt);
418 spin_lock(&rbd_client_list_lock);
419 rbdc = __rbd_client_find(opt);
421 /* using an existing client */
422 kref_get(&rbdc->kref);
423 spin_unlock(&rbd_client_list_lock);
425 ceph_destroy_options(opt);
430 spin_unlock(&rbd_client_list_lock);
432 rbdc = rbd_client_create(opt, rbd_opts);
441 * Destroy ceph client
443 * Caller must hold rbd_client_list_lock.
445 static void rbd_client_release(struct kref *kref)
447 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
449 dout("rbd_release_client %p\n", rbdc);
450 spin_lock(&rbd_client_list_lock);
451 list_del(&rbdc->node);
452 spin_unlock(&rbd_client_list_lock);
454 ceph_destroy_client(rbdc->client);
455 kfree(rbdc->rbd_opts);
460 * Drop reference to ceph client node. If it's not referenced anymore, release
463 static void rbd_put_client(struct rbd_device *rbd_dev)
465 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
466 rbd_dev->rbd_client = NULL;
470 * Destroy requests collection
472 static void rbd_coll_release(struct kref *kref)
474 struct rbd_req_coll *coll =
475 container_of(kref, struct rbd_req_coll, kref);
477 dout("rbd_coll_release %p\n", coll);
482 * Create a new header structure, translate header format from the on-disk
485 static int rbd_header_from_disk(struct rbd_image_header *header,
486 struct rbd_image_header_ondisk *ondisk,
492 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
495 snap_count = le32_to_cpu(ondisk->snap_count);
496 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
499 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
500 snap_count * sizeof(u64),
505 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
507 header->snap_names = kmalloc(header->snap_names_len,
509 if (!header->snap_names)
511 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
513 if (!header->snap_sizes)
516 header->snap_names = NULL;
517 header->snap_sizes = NULL;
520 header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
522 if (!header->object_prefix)
525 memcpy(header->object_prefix, ondisk->block_name,
526 sizeof(ondisk->block_name));
527 header->object_prefix[sizeof (ondisk->block_name)] = '\0';
529 header->image_size = le64_to_cpu(ondisk->image_size);
530 header->obj_order = ondisk->options.order;
531 header->crypt_type = ondisk->options.crypt_type;
532 header->comp_type = ondisk->options.comp_type;
534 atomic_set(&header->snapc->nref, 1);
535 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
536 header->snapc->num_snaps = snap_count;
537 header->total_snaps = snap_count;
539 if (snap_count && allocated_snaps == snap_count) {
540 for (i = 0; i < snap_count; i++) {
541 header->snapc->snaps[i] =
542 le64_to_cpu(ondisk->snaps[i].id);
543 header->snap_sizes[i] =
544 le64_to_cpu(ondisk->snaps[i].image_size);
547 /* copy snapshot names */
548 memcpy(header->snap_names, &ondisk->snaps[i],
549 header->snap_names_len);
555 kfree(header->snap_sizes);
557 kfree(header->snap_names);
559 kfree(header->snapc);
563 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
567 char *p = header->snap_names;
569 for (i = 0; i < header->total_snaps; i++) {
570 if (!strcmp(snap_name, p)) {
572 /* Found it. Pass back its id and/or size */
575 *seq = header->snapc->snaps[i];
577 *size = header->snap_sizes[i];
580 p += strlen(p) + 1; /* Skip ahead to the next name */
585 static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
587 struct rbd_image_header *header = &dev->header;
588 struct ceph_snap_context *snapc = header->snapc;
591 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
593 down_write(&dev->header_rwsem);
595 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
596 sizeof (RBD_SNAP_HEAD_NAME))) {
597 if (header->total_snaps)
598 snapc->seq = header->snap_seq;
601 dev->snap_id = CEPH_NOSNAP;
604 *size = header->image_size;
606 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
609 dev->snap_id = snapc->seq;
615 up_write(&dev->header_rwsem);
619 static void rbd_header_free(struct rbd_image_header *header)
621 kfree(header->object_prefix);
622 kfree(header->snap_sizes);
623 kfree(header->snap_names);
624 kfree(header->snapc);
628 * get the actual striped segment name, offset and length
630 static u64 rbd_get_segment(struct rbd_image_header *header,
631 const char *object_prefix,
633 char *seg_name, u64 *segofs)
635 u64 seg = ofs >> header->obj_order;
638 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
639 "%s.%012llx", object_prefix, seg);
641 ofs = ofs & ((1 << header->obj_order) - 1);
642 len = min_t(u64, len, (1 << header->obj_order) - ofs);
650 static int rbd_get_num_segments(struct rbd_image_header *header,
653 u64 start_seg = ofs >> header->obj_order;
654 u64 end_seg = (ofs + len - 1) >> header->obj_order;
655 return end_seg - start_seg + 1;
659 * returns the size of an object in the image
661 static u64 rbd_obj_bytes(struct rbd_image_header *header)
663 return 1 << header->obj_order;
670 static void bio_chain_put(struct bio *chain)
676 chain = chain->bi_next;
682 * zeros a bio chain, starting at specific offset
684 static void zero_bio_chain(struct bio *chain, int start_ofs)
693 bio_for_each_segment(bv, chain, i) {
694 if (pos + bv->bv_len > start_ofs) {
695 int remainder = max(start_ofs - pos, 0);
696 buf = bvec_kmap_irq(bv, &flags);
697 memset(buf + remainder, 0,
698 bv->bv_len - remainder);
699 bvec_kunmap_irq(buf, &flags);
704 chain = chain->bi_next;
709 * bio_chain_clone - clone a chain of bios up to a certain length.
710 * might return a bio_pair that will need to be released.
712 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
713 struct bio_pair **bp,
714 int len, gfp_t gfpmask)
716 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
720 bio_pair_release(*bp);
724 while (old_chain && (total < len)) {
725 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
729 if (total + old_chain->bi_size > len) {
733 * this split can only happen with a single paged bio,
734 * split_bio will BUG_ON if this is not the case
736 dout("bio_chain_clone split! total=%d remaining=%d"
738 (int)total, (int)len-total,
739 (int)old_chain->bi_size);
741 /* split the bio. We'll release it either in the next
742 call, or it will have to be released outside */
743 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
747 __bio_clone(tmp, &bp->bio1);
751 __bio_clone(tmp, old_chain);
752 *next = old_chain->bi_next;
756 gfpmask &= ~__GFP_WAIT;
760 new_chain = tail = tmp;
765 old_chain = old_chain->bi_next;
767 total += tmp->bi_size;
773 tail->bi_next = NULL;
780 dout("bio_chain_clone with err\n");
781 bio_chain_put(new_chain);
786 * helpers for osd request op vectors.
788 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
793 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
797 (*ops)[0].op = opcode;
799 * op extent offset and length will be set later on
800 * in calc_raw_layout()
802 (*ops)[0].payload_len = payload_len;
806 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
811 static void rbd_coll_end_req_index(struct request *rq,
812 struct rbd_req_coll *coll,
816 struct request_queue *q;
819 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
820 coll, index, ret, len);
826 blk_end_request(rq, ret, len);
832 spin_lock_irq(q->queue_lock);
833 coll->status[index].done = 1;
834 coll->status[index].rc = ret;
835 coll->status[index].bytes = len;
836 max = min = coll->num_done;
837 while (max < coll->total && coll->status[max].done)
840 for (i = min; i<max; i++) {
841 __blk_end_request(rq, coll->status[i].rc,
842 coll->status[i].bytes);
844 kref_put(&coll->kref, rbd_coll_release);
846 spin_unlock_irq(q->queue_lock);
849 static void rbd_coll_end_req(struct rbd_request *req,
852 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
856 * Send ceph osd request
858 static int rbd_do_request(struct request *rq,
859 struct rbd_device *dev,
860 struct ceph_snap_context *snapc,
862 const char *obj, u64 ofs, u64 len,
867 struct ceph_osd_req_op *ops,
869 struct rbd_req_coll *coll,
871 void (*rbd_cb)(struct ceph_osd_request *req,
872 struct ceph_msg *msg),
873 struct ceph_osd_request **linger_req,
876 struct ceph_osd_request *req;
877 struct ceph_file_layout *layout;
880 struct timespec mtime = CURRENT_TIME;
881 struct rbd_request *req_data;
882 struct ceph_osd_request_head *reqhead;
883 struct ceph_osd_client *osdc;
885 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
888 rbd_coll_end_req_index(rq, coll, coll_index,
894 req_data->coll = coll;
895 req_data->coll_index = coll_index;
898 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
900 down_read(&dev->header_rwsem);
902 osdc = &dev->rbd_client->client->osdc;
903 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
904 false, GFP_NOIO, pages, bio);
906 up_read(&dev->header_rwsem);
911 req->r_callback = rbd_cb;
915 req_data->pages = pages;
918 req->r_priv = req_data;
920 reqhead = req->r_request->front.iov_base;
921 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
923 strncpy(req->r_oid, obj, sizeof(req->r_oid));
924 req->r_oid_len = strlen(req->r_oid);
926 layout = &req->r_file_layout;
927 memset(layout, 0, sizeof(*layout));
928 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
929 layout->fl_stripe_count = cpu_to_le32(1);
930 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931 layout->fl_pg_pool = cpu_to_le32(dev->pool_id);
932 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
935 ceph_osdc_build_request(req, ofs, &len,
939 req->r_oid, req->r_oid_len);
940 up_read(&dev->header_rwsem);
943 ceph_osdc_set_request_linger(osdc, req);
947 ret = ceph_osdc_start_request(osdc, req, false);
952 ret = ceph_osdc_wait_request(osdc, req);
954 *ver = le64_to_cpu(req->r_reassert_version.version);
955 dout("reassert_ver=%lld\n",
956 le64_to_cpu(req->r_reassert_version.version));
957 ceph_osdc_put_request(req);
962 bio_chain_put(req_data->bio);
963 ceph_osdc_put_request(req);
965 rbd_coll_end_req(req_data, ret, len);
971 * Ceph osd op callback
973 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
975 struct rbd_request *req_data = req->r_priv;
976 struct ceph_osd_reply_head *replyhead;
977 struct ceph_osd_op *op;
983 replyhead = msg->front.iov_base;
984 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
985 op = (void *)(replyhead + 1);
986 rc = le32_to_cpu(replyhead->result);
987 bytes = le64_to_cpu(op->extent.length);
988 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
990 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
992 if (rc == -ENOENT && read_op) {
993 zero_bio_chain(req_data->bio, 0);
995 } else if (rc == 0 && read_op && bytes < req_data->len) {
996 zero_bio_chain(req_data->bio, bytes);
997 bytes = req_data->len;
1000 rbd_coll_end_req(req_data, rc, bytes);
1003 bio_chain_put(req_data->bio);
1005 ceph_osdc_put_request(req);
1009 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1011 ceph_osdc_put_request(req);
1015 * Do a synchronous ceph osd operation
1017 static int rbd_req_sync_op(struct rbd_device *dev,
1018 struct ceph_snap_context *snapc,
1022 struct ceph_osd_req_op *orig_ops,
1027 struct ceph_osd_request **linger_req,
1031 struct page **pages;
1033 struct ceph_osd_req_op *ops = orig_ops;
1036 num_pages = calc_pages_for(ofs , len);
1037 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1039 return PTR_ERR(pages);
1042 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1043 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1047 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1048 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1054 ret = rbd_do_request(NULL, dev, snapc, snapid,
1055 obj, ofs, len, NULL,
1066 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1067 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1071 rbd_destroy_ops(ops);
1073 ceph_release_page_vector(pages, num_pages);
1078 * Do an asynchronous ceph osd operation
1080 static int rbd_do_op(struct request *rq,
1081 struct rbd_device *rbd_dev ,
1082 struct ceph_snap_context *snapc,
1084 int opcode, int flags, int num_reply,
1087 struct rbd_req_coll *coll,
1094 struct ceph_osd_req_op *ops;
1097 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1101 seg_len = rbd_get_segment(&rbd_dev->header,
1102 rbd_dev->header.object_prefix,
1104 seg_name, &seg_ofs);
1106 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1108 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1112 /* we've taken care of segment sizes earlier when we
1113 cloned the bios. We should never have a segment
1114 truncated at this point */
1115 BUG_ON(seg_len < len);
1117 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1118 seg_name, seg_ofs, seg_len,
1125 rbd_req_cb, 0, NULL);
1127 rbd_destroy_ops(ops);
1134 * Request async osd write
1136 static int rbd_req_write(struct request *rq,
1137 struct rbd_device *rbd_dev,
1138 struct ceph_snap_context *snapc,
1141 struct rbd_req_coll *coll,
1144 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1146 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1148 ofs, len, bio, coll, coll_index);
1152 * Request async osd read
1154 static int rbd_req_read(struct request *rq,
1155 struct rbd_device *rbd_dev,
1159 struct rbd_req_coll *coll,
1162 return rbd_do_op(rq, rbd_dev, NULL,
1167 ofs, len, bio, coll, coll_index);
1171 * Request sync osd read
1173 static int rbd_req_sync_read(struct rbd_device *dev,
1174 struct ceph_snap_context *snapc,
1181 return rbd_req_sync_op(dev, NULL,
1186 1, obj, ofs, len, buf, NULL, ver);
1190 * Request sync osd watch
1192 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1197 struct ceph_osd_req_op *ops;
1200 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1204 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1205 ops[0].watch.cookie = notify_id;
1206 ops[0].watch.flag = 0;
1208 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1215 rbd_simple_req_cb, 0, NULL);
1217 rbd_destroy_ops(ops);
1221 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1223 struct rbd_device *dev = (struct rbd_device *)data;
1229 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1230 notify_id, (int)opcode);
1231 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1232 rc = __rbd_refresh_header(dev);
1233 mutex_unlock(&ctl_mutex);
1235 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1236 " update snaps: %d\n", dev->major, rc);
1238 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1242 * Request sync osd watch
1244 static int rbd_req_sync_watch(struct rbd_device *dev,
1248 struct ceph_osd_req_op *ops;
1249 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1251 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1255 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1256 (void *)dev, &dev->watch_event);
1260 ops[0].watch.ver = cpu_to_le64(ver);
1261 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1262 ops[0].watch.flag = 1;
1264 ret = rbd_req_sync_op(dev, NULL,
1267 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1270 &dev->watch_request, NULL);
1275 rbd_destroy_ops(ops);
1279 ceph_osdc_cancel_event(dev->watch_event);
1280 dev->watch_event = NULL;
1282 rbd_destroy_ops(ops);
1287 * Request sync osd unwatch
1289 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1292 struct ceph_osd_req_op *ops;
1294 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1298 ops[0].watch.ver = 0;
1299 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1300 ops[0].watch.flag = 0;
1302 ret = rbd_req_sync_op(dev, NULL,
1305 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1307 1, obj, 0, 0, NULL, NULL, NULL);
1309 rbd_destroy_ops(ops);
1310 ceph_osdc_cancel_event(dev->watch_event);
1311 dev->watch_event = NULL;
1315 struct rbd_notify_info {
1316 struct rbd_device *dev;
1319 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1321 struct rbd_device *dev = (struct rbd_device *)data;
1325 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1326 notify_id, (int)opcode);
1330 * Request sync osd notify
1332 static int rbd_req_sync_notify(struct rbd_device *dev,
1335 struct ceph_osd_req_op *ops;
1336 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1337 struct ceph_osd_event *event;
1338 struct rbd_notify_info info;
1339 int payload_len = sizeof(u32) + sizeof(u32);
1342 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1348 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1349 (void *)&info, &event);
1353 ops[0].watch.ver = 1;
1354 ops[0].watch.flag = 1;
1355 ops[0].watch.cookie = event->cookie;
1356 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1357 ops[0].watch.timeout = 12;
1359 ret = rbd_req_sync_op(dev, NULL,
1362 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1364 1, obj, 0, 0, NULL, NULL, NULL);
1368 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1369 dout("ceph_osdc_wait_event returned %d\n", ret);
1370 rbd_destroy_ops(ops);
1374 ceph_osdc_cancel_event(event);
1376 rbd_destroy_ops(ops);
1381 * Request sync osd read
1383 static int rbd_req_sync_exec(struct rbd_device *dev,
1391 struct ceph_osd_req_op *ops;
1392 int cls_len = strlen(cls);
1393 int method_len = strlen(method);
1394 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1395 cls_len + method_len + len);
1399 ops[0].cls.class_name = cls;
1400 ops[0].cls.class_len = (__u8)cls_len;
1401 ops[0].cls.method_name = method;
1402 ops[0].cls.method_len = (__u8)method_len;
1403 ops[0].cls.argc = 0;
1404 ops[0].cls.indata = data;
1405 ops[0].cls.indata_len = len;
1407 ret = rbd_req_sync_op(dev, NULL,
1410 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1412 1, obj, 0, 0, NULL, NULL, ver);
1414 rbd_destroy_ops(ops);
1416 dout("cls_exec returned %d\n", ret);
1420 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1422 struct rbd_req_coll *coll =
1423 kzalloc(sizeof(struct rbd_req_coll) +
1424 sizeof(struct rbd_req_status) * num_reqs,
1429 coll->total = num_reqs;
1430 kref_init(&coll->kref);
1435 * block device queue callback
1437 static void rbd_rq_fn(struct request_queue *q)
1439 struct rbd_device *rbd_dev = q->queuedata;
1441 struct bio_pair *bp = NULL;
1443 while ((rq = blk_fetch_request(q))) {
1445 struct bio *rq_bio, *next_bio = NULL;
1447 int size, op_size = 0;
1449 int num_segs, cur_seg = 0;
1450 struct rbd_req_coll *coll;
1452 /* peek at request from block layer */
1456 dout("fetched request\n");
1458 /* filter out block requests we don't understand */
1459 if ((rq->cmd_type != REQ_TYPE_FS)) {
1460 __blk_end_request_all(rq, 0);
1464 /* deduce our operation (read, write) */
1465 do_write = (rq_data_dir(rq) == WRITE);
1467 size = blk_rq_bytes(rq);
1468 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1470 if (do_write && rbd_dev->read_only) {
1471 __blk_end_request_all(rq, -EROFS);
1475 spin_unlock_irq(q->queue_lock);
1477 dout("%s 0x%x bytes at 0x%llx\n",
1478 do_write ? "write" : "read",
1479 size, blk_rq_pos(rq) * SECTOR_SIZE);
1481 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1482 coll = rbd_alloc_coll(num_segs);
1484 spin_lock_irq(q->queue_lock);
1485 __blk_end_request_all(rq, -ENOMEM);
1490 /* a bio clone to be passed down to OSD req */
1491 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1492 op_size = rbd_get_segment(&rbd_dev->header,
1493 rbd_dev->header.object_prefix,
1496 kref_get(&coll->kref);
1497 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1498 op_size, GFP_ATOMIC);
1500 rbd_coll_end_req_index(rq, coll, cur_seg,
1506 /* init OSD command: write or read */
1508 rbd_req_write(rq, rbd_dev,
1509 rbd_dev->header.snapc,
1514 rbd_req_read(rq, rbd_dev,
1527 kref_put(&coll->kref, rbd_coll_release);
1530 bio_pair_release(bp);
1531 spin_lock_irq(q->queue_lock);
1536 * a queue callback. Makes sure that we don't create a bio that spans across
1537 * multiple osd objects. One exception would be with a single page bios,
1538 * which we handle later at bio_chain_clone
1540 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1541 struct bio_vec *bvec)
1543 struct rbd_device *rbd_dev = q->queuedata;
1544 unsigned int chunk_sectors;
1546 unsigned int bio_sectors;
1549 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1550 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1551 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1553 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1554 + bio_sectors)) << SECTOR_SHIFT;
1556 max = 0; /* bio_add cannot handle a negative return */
1557 if (max <= bvec->bv_len && bio_sectors == 0)
1558 return bvec->bv_len;
1562 static void rbd_free_disk(struct rbd_device *rbd_dev)
1564 struct gendisk *disk = rbd_dev->disk;
1569 rbd_header_free(&rbd_dev->header);
1571 if (disk->flags & GENHD_FL_UP)
1574 blk_cleanup_queue(disk->queue);
1579 * reload the ondisk the header
1581 static int rbd_read_header(struct rbd_device *rbd_dev,
1582 struct rbd_image_header *header)
1585 struct rbd_image_header_ondisk *dh;
1591 * First reads the fixed-size header to determine the number
1592 * of snapshots, then re-reads it, along with all snapshot
1593 * records as well as their stored names.
1597 dh = kmalloc(len, GFP_KERNEL);
1601 rc = rbd_req_sync_read(rbd_dev,
1603 rbd_dev->obj_md_name,
1609 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1612 pr_warning("unrecognized header format"
1613 " for image %s", rbd_dev->obj);
1617 if (snap_count == header->total_snaps)
1620 snap_count = header->total_snaps;
1621 len = sizeof (*dh) +
1622 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1623 header->snap_names_len;
1625 rbd_header_free(header);
1628 header->obj_version = ver;
1638 static int rbd_header_add_snap(struct rbd_device *dev,
1639 const char *snap_name,
1642 int name_len = strlen(snap_name);
1647 struct ceph_mon_client *monc;
1649 /* we should create a snapshot only if we're pointing at the head */
1650 if (dev->snap_id != CEPH_NOSNAP)
1653 monc = &dev->rbd_client->client->monc;
1654 ret = ceph_monc_create_snapid(monc, dev->pool_id, &new_snapid);
1655 dout("created snapid=%lld\n", new_snapid);
1659 data = kmalloc(name_len + 16, gfp_flags);
1664 e = data + name_len + 16;
1666 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1667 ceph_encode_64_safe(&p, e, new_snapid, bad);
1669 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1670 data, p - data, &ver);
1677 down_write(&dev->header_rwsem);
1678 dev->header.snapc->seq = new_snapid;
1679 up_write(&dev->header_rwsem);
1686 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1688 struct rbd_snap *snap;
1690 while (!list_empty(&rbd_dev->snaps)) {
1691 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1692 __rbd_remove_snap_dev(rbd_dev, snap);
1697 * only read the first part of the ondisk header, without the snaps info
1699 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1702 struct rbd_image_header h;
1706 ret = rbd_read_header(rbd_dev, &h);
1711 set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1713 down_write(&rbd_dev->header_rwsem);
1715 snap_seq = rbd_dev->header.snapc->seq;
1716 if (rbd_dev->header.total_snaps &&
1717 rbd_dev->header.snapc->snaps[0] == snap_seq)
1718 /* pointing at the head, will need to follow that
1722 /* rbd_dev->header.object_prefix shouldn't change */
1723 kfree(rbd_dev->header.snap_sizes);
1724 kfree(rbd_dev->header.snap_names);
1725 kfree(rbd_dev->header.snapc);
1727 rbd_dev->header.total_snaps = h.total_snaps;
1728 rbd_dev->header.snapc = h.snapc;
1729 rbd_dev->header.snap_names = h.snap_names;
1730 rbd_dev->header.snap_names_len = h.snap_names_len;
1731 rbd_dev->header.snap_sizes = h.snap_sizes;
1732 /* Free the extra copy of the object prefix */
1733 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1734 kfree(h.object_prefix);
1737 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1739 rbd_dev->header.snapc->seq = snap_seq;
1741 ret = __rbd_init_snaps_header(rbd_dev);
1743 up_write(&rbd_dev->header_rwsem);
1748 static int rbd_init_disk(struct rbd_device *rbd_dev)
1750 struct gendisk *disk;
1751 struct request_queue *q;
1756 /* contact OSD, request size info about the object being mapped */
1757 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1761 /* no need to lock here, as rbd_dev is not registered yet */
1762 rc = __rbd_init_snaps_header(rbd_dev);
1766 rc = rbd_header_set_snap(rbd_dev, &total_size);
1770 /* create gendisk info */
1772 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1776 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1778 disk->major = rbd_dev->major;
1779 disk->first_minor = 0;
1780 disk->fops = &rbd_bd_ops;
1781 disk->private_data = rbd_dev;
1785 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1789 /* We use the default size, but let's be explicit about it. */
1790 blk_queue_physical_block_size(q, SECTOR_SIZE);
1792 /* set io sizes to object size */
1793 segment_size = rbd_obj_bytes(&rbd_dev->header);
1794 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1795 blk_queue_max_segment_size(q, segment_size);
1796 blk_queue_io_min(q, segment_size);
1797 blk_queue_io_opt(q, segment_size);
1799 blk_queue_merge_bvec(q, rbd_merge_bvec);
1802 q->queuedata = rbd_dev;
1804 rbd_dev->disk = disk;
1807 /* finally, announce the disk to the world */
1808 set_capacity(disk, total_size / SECTOR_SIZE);
1811 pr_info("%s: added with size 0x%llx\n",
1812 disk->disk_name, (unsigned long long)total_size);
1825 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1827 return container_of(dev, struct rbd_device, dev);
1830 static ssize_t rbd_size_show(struct device *dev,
1831 struct device_attribute *attr, char *buf)
1833 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1835 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1838 static ssize_t rbd_major_show(struct device *dev,
1839 struct device_attribute *attr, char *buf)
1841 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1843 return sprintf(buf, "%d\n", rbd_dev->major);
1846 static ssize_t rbd_client_id_show(struct device *dev,
1847 struct device_attribute *attr, char *buf)
1849 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1851 return sprintf(buf, "client%lld\n",
1852 ceph_client_id(rbd_dev->rbd_client->client));
1855 static ssize_t rbd_pool_show(struct device *dev,
1856 struct device_attribute *attr, char *buf)
1858 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1860 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1863 static ssize_t rbd_pool_id_show(struct device *dev,
1864 struct device_attribute *attr, char *buf)
1866 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1868 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1871 static ssize_t rbd_name_show(struct device *dev,
1872 struct device_attribute *attr, char *buf)
1874 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1876 return sprintf(buf, "%s\n", rbd_dev->obj);
1879 static ssize_t rbd_snap_show(struct device *dev,
1880 struct device_attribute *attr,
1883 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1885 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1888 static ssize_t rbd_image_refresh(struct device *dev,
1889 struct device_attribute *attr,
1893 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1897 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1899 rc = __rbd_refresh_header(rbd_dev);
1903 mutex_unlock(&ctl_mutex);
1907 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1908 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1909 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1910 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1911 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1912 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1913 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1914 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1915 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1917 static struct attribute *rbd_attrs[] = {
1918 &dev_attr_size.attr,
1919 &dev_attr_major.attr,
1920 &dev_attr_client_id.attr,
1921 &dev_attr_pool.attr,
1922 &dev_attr_pool_id.attr,
1923 &dev_attr_name.attr,
1924 &dev_attr_current_snap.attr,
1925 &dev_attr_refresh.attr,
1926 &dev_attr_create_snap.attr,
1930 static struct attribute_group rbd_attr_group = {
1934 static const struct attribute_group *rbd_attr_groups[] = {
1939 static void rbd_sysfs_dev_release(struct device *dev)
1943 static struct device_type rbd_device_type = {
1945 .groups = rbd_attr_groups,
1946 .release = rbd_sysfs_dev_release,
1954 static ssize_t rbd_snap_size_show(struct device *dev,
1955 struct device_attribute *attr,
1958 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1960 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1963 static ssize_t rbd_snap_id_show(struct device *dev,
1964 struct device_attribute *attr,
1967 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1969 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1972 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1973 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1975 static struct attribute *rbd_snap_attrs[] = {
1976 &dev_attr_snap_size.attr,
1977 &dev_attr_snap_id.attr,
1981 static struct attribute_group rbd_snap_attr_group = {
1982 .attrs = rbd_snap_attrs,
1985 static void rbd_snap_dev_release(struct device *dev)
1987 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1992 static const struct attribute_group *rbd_snap_attr_groups[] = {
1993 &rbd_snap_attr_group,
1997 static struct device_type rbd_snap_device_type = {
1998 .groups = rbd_snap_attr_groups,
1999 .release = rbd_snap_dev_release,
2002 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2003 struct rbd_snap *snap)
2005 list_del(&snap->node);
2006 device_unregister(&snap->dev);
2009 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2010 struct rbd_snap *snap,
2011 struct device *parent)
2013 struct device *dev = &snap->dev;
2016 dev->type = &rbd_snap_device_type;
2017 dev->parent = parent;
2018 dev->release = rbd_snap_dev_release;
2019 dev_set_name(dev, "snap_%s", snap->name);
2020 ret = device_register(dev);
2025 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2026 int i, const char *name,
2027 struct rbd_snap **snapp)
2030 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2033 snap->name = kstrdup(name, GFP_KERNEL);
2034 snap->size = rbd_dev->header.snap_sizes[i];
2035 snap->id = rbd_dev->header.snapc->snaps[i];
2036 if (device_is_registered(&rbd_dev->dev)) {
2037 ret = rbd_register_snap_dev(rbd_dev, snap,
2051 * search for the previous snap in a null delimited string list
2053 const char *rbd_prev_snap_name(const char *name, const char *start)
2055 if (name < start + 2)
2068 * compare the old list of snapshots that we have to what's in the header
2069 * and update it accordingly. Note that the header holds the snapshots
2070 * in a reverse order (from newest to oldest) and we need to go from
2071 * older to new so that we don't get a duplicate snap name when
2072 * doing the process (e.g., removed snapshot and recreated a new
2073 * one with the same name.
2075 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2077 const char *name, *first_name;
2078 int i = rbd_dev->header.total_snaps;
2079 struct rbd_snap *snap, *old_snap = NULL;
2081 struct list_head *p, *n;
2083 first_name = rbd_dev->header.snap_names;
2084 name = first_name + rbd_dev->header.snap_names_len;
2086 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2089 old_snap = list_entry(p, struct rbd_snap, node);
2092 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2094 if (!i || old_snap->id < cur_id) {
2095 /* old_snap->id was skipped, thus was removed */
2096 __rbd_remove_snap_dev(rbd_dev, old_snap);
2099 if (old_snap->id == cur_id) {
2100 /* we have this snapshot already */
2102 name = rbd_prev_snap_name(name, first_name);
2106 i--, name = rbd_prev_snap_name(name, first_name)) {
2111 cur_id = rbd_dev->header.snapc->snaps[i];
2112 /* snapshot removal? handle it above */
2113 if (cur_id >= old_snap->id)
2115 /* a new snapshot */
2116 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2120 /* note that we add it backward so using n and not p */
2121 list_add(&snap->node, n);
2125 /* we're done going over the old snap list, just add what's left */
2126 for (; i > 0; i--) {
2127 name = rbd_prev_snap_name(name, first_name);
2132 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2135 list_add(&snap->node, &rbd_dev->snaps);
2141 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2145 struct rbd_snap *snap;
2147 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2148 dev = &rbd_dev->dev;
2150 dev->bus = &rbd_bus_type;
2151 dev->type = &rbd_device_type;
2152 dev->parent = &rbd_root_dev;
2153 dev->release = rbd_dev_release;
2154 dev_set_name(dev, "%d", rbd_dev->id);
2155 ret = device_register(dev);
2159 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2160 ret = rbd_register_snap_dev(rbd_dev, snap,
2166 mutex_unlock(&ctl_mutex);
2170 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2172 device_unregister(&rbd_dev->dev);
2175 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2180 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2181 rbd_dev->header.obj_version);
2182 if (ret == -ERANGE) {
2183 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2184 rc = __rbd_refresh_header(rbd_dev);
2185 mutex_unlock(&ctl_mutex);
2189 } while (ret == -ERANGE);
2194 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2197 * Get a unique rbd identifier for the given new rbd_dev, and add
2198 * the rbd_dev to the global list. The minimum rbd id is 1.
2200 static void rbd_id_get(struct rbd_device *rbd_dev)
2202 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2204 spin_lock(&rbd_dev_list_lock);
2205 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2206 spin_unlock(&rbd_dev_list_lock);
2210 * Remove an rbd_dev from the global list, and record that its
2211 * identifier is no longer in use.
2213 static void rbd_id_put(struct rbd_device *rbd_dev)
2215 struct list_head *tmp;
2216 int rbd_id = rbd_dev->id;
2221 spin_lock(&rbd_dev_list_lock);
2222 list_del_init(&rbd_dev->node);
2225 * If the id being "put" is not the current maximum, there
2226 * is nothing special we need to do.
2228 if (rbd_id != atomic64_read(&rbd_id_max)) {
2229 spin_unlock(&rbd_dev_list_lock);
2234 * We need to update the current maximum id. Search the
2235 * list to find out what it is. We're more likely to find
2236 * the maximum at the end, so search the list backward.
2239 list_for_each_prev(tmp, &rbd_dev_list) {
2240 struct rbd_device *rbd_dev;
2242 rbd_dev = list_entry(tmp, struct rbd_device, node);
2243 if (rbd_id > max_id)
2246 spin_unlock(&rbd_dev_list_lock);
2249 * The max id could have been updated by rbd_id_get(), in
2250 * which case it now accurately reflects the new maximum.
2251 * Be careful not to overwrite the maximum value in that
2254 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2258 * Skips over white space at *buf, and updates *buf to point to the
2259 * first found non-space character (if any). Returns the length of
2260 * the token (string of non-white space characters) found. Note
2261 * that *buf must be terminated with '\0'.
2263 static inline size_t next_token(const char **buf)
2266 * These are the characters that produce nonzero for
2267 * isspace() in the "C" and "POSIX" locales.
2269 const char *spaces = " \f\n\r\t\v";
2271 *buf += strspn(*buf, spaces); /* Find start of token */
2273 return strcspn(*buf, spaces); /* Return token length */
2277 * Finds the next token in *buf, and if the provided token buffer is
2278 * big enough, copies the found token into it. The result, if
2279 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2280 * must be terminated with '\0' on entry.
2282 * Returns the length of the token found (not including the '\0').
2283 * Return value will be 0 if no token is found, and it will be >=
2284 * token_size if the token would not fit.
2286 * The *buf pointer will be updated to point beyond the end of the
2287 * found token. Note that this occurs even if the token buffer is
2288 * too small to hold it.
2290 static inline size_t copy_token(const char **buf,
2296 len = next_token(buf);
2297 if (len < token_size) {
2298 memcpy(token, *buf, len);
2299 *(token + len) = '\0';
2307 * Finds the next token in *buf, dynamically allocates a buffer big
2308 * enough to hold a copy of it, and copies the token into the new
2309 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2310 * that a duplicate buffer is created even for a zero-length token.
2312 * Returns a pointer to the newly-allocated duplicate, or a null
2313 * pointer if memory for the duplicate was not available. If
2314 * the lenp argument is a non-null pointer, the length of the token
2315 * (not including the '\0') is returned in *lenp.
2317 * If successful, the *buf pointer will be updated to point beyond
2318 * the end of the found token.
2320 * Note: uses GFP_KERNEL for allocation.
2322 static inline char *dup_token(const char **buf, size_t *lenp)
2327 len = next_token(buf);
2328 dup = kmalloc(len + 1, GFP_KERNEL);
2332 memcpy(dup, *buf, len);
2333 *(dup + len) = '\0';
2343 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2344 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2345 * on the list of monitor addresses and other options provided via
2348 * Note: rbd_dev is assumed to have been initially zero-filled.
2350 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2352 const char **mon_addrs,
2353 size_t *mon_addrs_size,
2355 size_t options_size)
2360 /* The first four tokens are required */
2362 len = next_token(&buf);
2365 *mon_addrs_size = len + 1;
2370 len = copy_token(&buf, options, options_size);
2371 if (!len || len >= options_size)
2375 rbd_dev->pool_name = dup_token(&buf, NULL);
2376 if (!rbd_dev->pool_name)
2379 rbd_dev->obj = dup_token(&buf, &rbd_dev->obj_len);
2383 /* Create the name of the header object */
2385 rbd_dev->obj_md_name = kmalloc(rbd_dev->obj_len
2386 + sizeof (RBD_SUFFIX),
2388 if (!rbd_dev->obj_md_name)
2390 sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2393 * The snapshot name is optional, but it's an error if it's
2394 * too long. If no snapshot is supplied, fill in the default.
2396 len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2398 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2399 sizeof (RBD_SNAP_HEAD_NAME));
2400 else if (len >= sizeof (rbd_dev->snap_name)) {
2408 kfree(rbd_dev->obj_md_name);
2409 kfree(rbd_dev->obj);
2410 kfree(rbd_dev->pool_name);
2411 rbd_dev->pool_name = NULL;
2416 static ssize_t rbd_add(struct bus_type *bus,
2421 struct rbd_device *rbd_dev = NULL;
2422 const char *mon_addrs = NULL;
2423 size_t mon_addrs_size = 0;
2424 struct ceph_osd_client *osdc;
2427 if (!try_module_get(THIS_MODULE))
2430 options = kmalloc(count, GFP_KERNEL);
2433 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2437 /* static rbd_device initialization */
2438 spin_lock_init(&rbd_dev->lock);
2439 INIT_LIST_HEAD(&rbd_dev->node);
2440 INIT_LIST_HEAD(&rbd_dev->snaps);
2441 init_rwsem(&rbd_dev->header_rwsem);
2443 init_rwsem(&rbd_dev->header_rwsem);
2445 /* generate unique id: find highest unique id, add one */
2446 rbd_id_get(rbd_dev);
2448 /* Fill in the device name, now that we have its id. */
2449 BUILD_BUG_ON(DEV_NAME_LEN
2450 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2451 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2453 /* parse add command */
2454 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2459 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2461 if (IS_ERR(rbd_dev->rbd_client)) {
2462 rc = PTR_ERR(rbd_dev->rbd_client);
2467 osdc = &rbd_dev->rbd_client->client->osdc;
2468 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2470 goto err_out_client;
2471 rbd_dev->pool_id = rc;
2473 /* register our block device */
2474 rc = register_blkdev(0, rbd_dev->name);
2476 goto err_out_client;
2477 rbd_dev->major = rc;
2479 rc = rbd_bus_add_dev(rbd_dev);
2481 goto err_out_blkdev;
2484 * At this point cleanup in the event of an error is the job
2485 * of the sysfs code (initiated by rbd_bus_del_dev()).
2487 * Set up and announce blkdev mapping.
2489 rc = rbd_init_disk(rbd_dev);
2493 rc = rbd_init_watch_dev(rbd_dev);
2500 /* this will also clean up rest of rbd_dev stuff */
2502 rbd_bus_del_dev(rbd_dev);
2507 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2509 rbd_put_client(rbd_dev);
2511 if (rbd_dev->pool_name) {
2512 kfree(rbd_dev->obj_md_name);
2513 kfree(rbd_dev->obj);
2514 kfree(rbd_dev->pool_name);
2516 rbd_id_put(rbd_dev);
2521 dout("Error adding device %s\n", buf);
2522 module_put(THIS_MODULE);
2524 return (ssize_t) rc;
2527 static struct rbd_device *__rbd_get_dev(unsigned long id)
2529 struct list_head *tmp;
2530 struct rbd_device *rbd_dev;
2532 spin_lock(&rbd_dev_list_lock);
2533 list_for_each(tmp, &rbd_dev_list) {
2534 rbd_dev = list_entry(tmp, struct rbd_device, node);
2535 if (rbd_dev->id == id) {
2536 spin_unlock(&rbd_dev_list_lock);
2540 spin_unlock(&rbd_dev_list_lock);
2544 static void rbd_dev_release(struct device *dev)
2546 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2548 if (rbd_dev->watch_request) {
2549 struct ceph_client *client = rbd_dev->rbd_client->client;
2551 ceph_osdc_unregister_linger_request(&client->osdc,
2552 rbd_dev->watch_request);
2554 if (rbd_dev->watch_event)
2555 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2557 rbd_put_client(rbd_dev);
2559 /* clean up and free blkdev */
2560 rbd_free_disk(rbd_dev);
2561 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2563 /* done with the id, and with the rbd_dev */
2564 kfree(rbd_dev->obj_md_name);
2565 kfree(rbd_dev->pool_name);
2566 kfree(rbd_dev->obj);
2567 rbd_id_put(rbd_dev);
2570 /* release module ref */
2571 module_put(THIS_MODULE);
2574 static ssize_t rbd_remove(struct bus_type *bus,
2578 struct rbd_device *rbd_dev = NULL;
2583 rc = strict_strtoul(buf, 10, &ul);
2587 /* convert to int; abort if we lost anything in the conversion */
2588 target_id = (int) ul;
2589 if (target_id != ul)
2592 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2594 rbd_dev = __rbd_get_dev(target_id);
2600 __rbd_remove_all_snaps(rbd_dev);
2601 rbd_bus_del_dev(rbd_dev);
2604 mutex_unlock(&ctl_mutex);
2608 static ssize_t rbd_snap_add(struct device *dev,
2609 struct device_attribute *attr,
2613 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2615 char *name = kmalloc(count + 1, GFP_KERNEL);
2619 snprintf(name, count, "%s", buf);
2621 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2623 ret = rbd_header_add_snap(rbd_dev,
2628 ret = __rbd_refresh_header(rbd_dev);
2632 /* shouldn't hold ctl_mutex when notifying.. notify might
2633 trigger a watch callback that would need to get that mutex */
2634 mutex_unlock(&ctl_mutex);
2636 /* make a best effort, don't error if failed */
2637 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2644 mutex_unlock(&ctl_mutex);
2650 * create control files in sysfs
2653 static int rbd_sysfs_init(void)
2657 ret = device_register(&rbd_root_dev);
2661 ret = bus_register(&rbd_bus_type);
2663 device_unregister(&rbd_root_dev);
2668 static void rbd_sysfs_cleanup(void)
2670 bus_unregister(&rbd_bus_type);
2671 device_unregister(&rbd_root_dev);
2674 int __init rbd_init(void)
2678 rc = rbd_sysfs_init();
2681 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2685 void __exit rbd_exit(void)
2687 rbd_sysfs_cleanup();
2690 module_init(rbd_init);
2691 module_exit(rbd_exit);
2693 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2694 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2695 MODULE_DESCRIPTION("rados block device");
2697 /* following authorship retained from original osdblk.c */
2698 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2700 MODULE_LICENSE("GPL");