rbd: fix integer overflow in rbd_header_from_disk()
[firefly-linux-kernel-4.4.55.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_MD_NAME_LEN     (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
59 #define RBD_MAX_POOL_NAME_LEN   64
60 #define RBD_MAX_SNAP_NAME_LEN   32
61 #define RBD_MAX_OPT_LEN         1024
62
63 #define RBD_SNAP_HEAD_NAME      "-"
64
65 /*
66  * An RBD device name will be "rbd#", where the "rbd" comes from
67  * RBD_DRV_NAME above, and # is a unique integer identifier.
68  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
69  * enough to hold all possible device names.
70  */
71 #define DEV_NAME_LEN            32
72 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
73
74 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75
76 /*
77  * block device image metadata (in-memory version)
78  */
79 struct rbd_image_header {
80         u64 image_size;
81         char block_name[32];
82         __u8 obj_order;
83         __u8 crypt_type;
84         __u8 comp_type;
85         struct ceph_snap_context *snapc;
86         size_t snap_names_len;
87         u64 snap_seq;
88         u32 total_snaps;
89
90         char *snap_names;
91         u64 *snap_sizes;
92
93         u64 obj_version;
94 };
95
96 struct rbd_options {
97         int     notify_timeout;
98 };
99
100 /*
101  * an instance of the client.  multiple devices may share an rbd client.
102  */
103 struct rbd_client {
104         struct ceph_client      *client;
105         struct rbd_options      *rbd_opts;
106         struct kref             kref;
107         struct list_head        node;
108 };
109
110 /*
111  * a request completion status
112  */
113 struct rbd_req_status {
114         int done;
115         int rc;
116         u64 bytes;
117 };
118
119 /*
120  * a collection of requests
121  */
122 struct rbd_req_coll {
123         int                     total;
124         int                     num_done;
125         struct kref             kref;
126         struct rbd_req_status   status[0];
127 };
128
129 /*
130  * a single io request
131  */
132 struct rbd_request {
133         struct request          *rq;            /* blk layer request */
134         struct bio              *bio;           /* cloned bio */
135         struct page             **pages;        /* list of used pages */
136         u64                     len;
137         int                     coll_index;
138         struct rbd_req_coll     *coll;
139 };
140
141 struct rbd_snap {
142         struct  device          dev;
143         const char              *name;
144         size_t                  size;
145         struct list_head        node;
146         u64                     id;
147 };
148
149 /*
150  * a single device
151  */
152 struct rbd_device {
153         int                     id;             /* blkdev unique id */
154
155         int                     major;          /* blkdev assigned major */
156         struct gendisk          *disk;          /* blkdev's gendisk and rq */
157         struct request_queue    *q;
158
159         struct rbd_client       *rbd_client;
160
161         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
162
163         spinlock_t              lock;           /* queue lock */
164
165         struct rbd_image_header header;
166         char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
167         int                     obj_len;
168         char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
169         char                    pool_name[RBD_MAX_POOL_NAME_LEN];
170         int                     poolid;
171
172         struct ceph_osd_event   *watch_event;
173         struct ceph_osd_request *watch_request;
174
175         /* protects updating the header */
176         struct rw_semaphore     header_rwsem;
177         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
178         u32 cur_snap;   /* index+1 of current snapshot within snap context
179                            0 - for the head */
180         int read_only;
181
182         struct list_head        node;
183
184         /* list of snapshots */
185         struct list_head        snaps;
186
187         /* sysfs related */
188         struct device           dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list);              /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202                             struct device_attribute *attr,
203                             const char *buf,
204                             size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
206                                   struct rbd_snap *snap);
207
208 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209                        size_t count);
210 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
211                           size_t count);
212
213 static struct bus_attribute rbd_bus_attrs[] = {
214         __ATTR(add, S_IWUSR, NULL, rbd_add),
215         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
216         __ATTR_NULL
217 };
218
219 static struct bus_type rbd_bus_type = {
220         .name           = "rbd",
221         .bus_attrs      = rbd_bus_attrs,
222 };
223
224 static void rbd_root_dev_release(struct device *dev)
225 {
226 }
227
228 static struct device rbd_root_dev = {
229         .init_name =    "rbd",
230         .release =      rbd_root_dev_release,
231 };
232
233
234 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
235 {
236         return get_device(&rbd_dev->dev);
237 }
238
239 static void rbd_put_dev(struct rbd_device *rbd_dev)
240 {
241         put_device(&rbd_dev->dev);
242 }
243
244 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
245
246 static int rbd_open(struct block_device *bdev, fmode_t mode)
247 {
248         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
249
250         rbd_get_dev(rbd_dev);
251
252         set_device_ro(bdev, rbd_dev->read_only);
253
254         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
255                 return -EROFS;
256
257         return 0;
258 }
259
260 static int rbd_release(struct gendisk *disk, fmode_t mode)
261 {
262         struct rbd_device *rbd_dev = disk->private_data;
263
264         rbd_put_dev(rbd_dev);
265
266         return 0;
267 }
268
269 static const struct block_device_operations rbd_bd_ops = {
270         .owner                  = THIS_MODULE,
271         .open                   = rbd_open,
272         .release                = rbd_release,
273 };
274
275 /*
276  * Initialize an rbd client instance.
277  * We own *opt.
278  */
279 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
280                                             struct rbd_options *rbd_opts)
281 {
282         struct rbd_client *rbdc;
283         int ret = -ENOMEM;
284
285         dout("rbd_client_create\n");
286         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
287         if (!rbdc)
288                 goto out_opt;
289
290         kref_init(&rbdc->kref);
291         INIT_LIST_HEAD(&rbdc->node);
292
293         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
294
295         rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
296         if (IS_ERR(rbdc->client))
297                 goto out_mutex;
298         opt = NULL; /* Now rbdc->client is responsible for opt */
299
300         ret = ceph_open_session(rbdc->client);
301         if (ret < 0)
302                 goto out_err;
303
304         rbdc->rbd_opts = rbd_opts;
305
306         spin_lock(&rbd_client_list_lock);
307         list_add_tail(&rbdc->node, &rbd_client_list);
308         spin_unlock(&rbd_client_list_lock);
309
310         mutex_unlock(&ctl_mutex);
311
312         dout("rbd_client_create created %p\n", rbdc);
313         return rbdc;
314
315 out_err:
316         ceph_destroy_client(rbdc->client);
317 out_mutex:
318         mutex_unlock(&ctl_mutex);
319         kfree(rbdc);
320 out_opt:
321         if (opt)
322                 ceph_destroy_options(opt);
323         return ERR_PTR(ret);
324 }
325
326 /*
327  * Find a ceph client with specific addr and configuration.
328  */
329 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
330 {
331         struct rbd_client *client_node;
332
333         if (opt->flags & CEPH_OPT_NOSHARE)
334                 return NULL;
335
336         list_for_each_entry(client_node, &rbd_client_list, node)
337                 if (ceph_compare_options(opt, client_node->client) == 0)
338                         return client_node;
339         return NULL;
340 }
341
342 /*
343  * mount options
344  */
345 enum {
346         Opt_notify_timeout,
347         Opt_last_int,
348         /* int args above */
349         Opt_last_string,
350         /* string args above */
351 };
352
353 static match_table_t rbdopt_tokens = {
354         {Opt_notify_timeout, "notify_timeout=%d"},
355         /* int args above */
356         /* string args above */
357         {-1, NULL}
358 };
359
360 static int parse_rbd_opts_token(char *c, void *private)
361 {
362         struct rbd_options *rbdopt = private;
363         substring_t argstr[MAX_OPT_ARGS];
364         int token, intval, ret;
365
366         token = match_token(c, rbdopt_tokens, argstr);
367         if (token < 0)
368                 return -EINVAL;
369
370         if (token < Opt_last_int) {
371                 ret = match_int(&argstr[0], &intval);
372                 if (ret < 0) {
373                         pr_err("bad mount option arg (not int) "
374                                "at '%s'\n", c);
375                         return ret;
376                 }
377                 dout("got int token %d val %d\n", token, intval);
378         } else if (token > Opt_last_int && token < Opt_last_string) {
379                 dout("got string token %d val %s\n", token,
380                      argstr[0].from);
381         } else {
382                 dout("got token %d\n", token);
383         }
384
385         switch (token) {
386         case Opt_notify_timeout:
387                 rbdopt->notify_timeout = intval;
388                 break;
389         default:
390                 BUG_ON(token);
391         }
392         return 0;
393 }
394
395 /*
396  * Get a ceph client with specific addr and configuration, if one does
397  * not exist create it.
398  */
399 static struct rbd_client *rbd_get_client(const char *mon_addr,
400                                          size_t mon_addr_len,
401                                          char *options)
402 {
403         struct rbd_client *rbdc;
404         struct ceph_options *opt;
405         struct rbd_options *rbd_opts;
406
407         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
408         if (!rbd_opts)
409                 return ERR_PTR(-ENOMEM);
410
411         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
412
413         opt = ceph_parse_options(options, mon_addr,
414                                 mon_addr + mon_addr_len,
415                                 parse_rbd_opts_token, rbd_opts);
416         if (IS_ERR(opt)) {
417                 kfree(rbd_opts);
418                 return ERR_CAST(opt);
419         }
420
421         spin_lock(&rbd_client_list_lock);
422         rbdc = __rbd_client_find(opt);
423         if (rbdc) {
424                 /* using an existing client */
425                 kref_get(&rbdc->kref);
426                 spin_unlock(&rbd_client_list_lock);
427
428                 ceph_destroy_options(opt);
429                 kfree(rbd_opts);
430
431                 return rbdc;
432         }
433         spin_unlock(&rbd_client_list_lock);
434
435         rbdc = rbd_client_create(opt, rbd_opts);
436
437         if (IS_ERR(rbdc))
438                 kfree(rbd_opts);
439
440         return rbdc;
441 }
442
443 /*
444  * Destroy ceph client
445  *
446  * Caller must hold rbd_client_list_lock.
447  */
448 static void rbd_client_release(struct kref *kref)
449 {
450         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
451
452         dout("rbd_release_client %p\n", rbdc);
453         spin_lock(&rbd_client_list_lock);
454         list_del(&rbdc->node);
455         spin_unlock(&rbd_client_list_lock);
456
457         ceph_destroy_client(rbdc->client);
458         kfree(rbdc->rbd_opts);
459         kfree(rbdc);
460 }
461
462 /*
463  * Drop reference to ceph client node. If it's not referenced anymore, release
464  * it.
465  */
466 static void rbd_put_client(struct rbd_device *rbd_dev)
467 {
468         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
469         rbd_dev->rbd_client = NULL;
470 }
471
472 /*
473  * Destroy requests collection
474  */
475 static void rbd_coll_release(struct kref *kref)
476 {
477         struct rbd_req_coll *coll =
478                 container_of(kref, struct rbd_req_coll, kref);
479
480         dout("rbd_coll_release %p\n", coll);
481         kfree(coll);
482 }
483
484 /*
485  * Create a new header structure, translate header format from the on-disk
486  * header.
487  */
488 static int rbd_header_from_disk(struct rbd_image_header *header,
489                                  struct rbd_image_header_ondisk *ondisk,
490                                  u32 allocated_snaps,
491                                  gfp_t gfp_flags)
492 {
493         u32 i, snap_count;
494
495         if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
496                 return -ENXIO;
497
498         snap_count = le32_to_cpu(ondisk->snap_count);
499         if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
500                          / sizeof (*ondisk))
501                 return -EINVAL;
502         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
503                                 snap_count * sizeof (*ondisk),
504                                 gfp_flags);
505         if (!header->snapc)
506                 return -ENOMEM;
507
508         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
509         if (snap_count) {
510                 header->snap_names = kmalloc(header->snap_names_len,
511                                              gfp_flags);
512                 if (!header->snap_names)
513                         goto err_snapc;
514                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
515                                              gfp_flags);
516                 if (!header->snap_sizes)
517                         goto err_names;
518         } else {
519                 header->snap_names = NULL;
520                 header->snap_sizes = NULL;
521         }
522         memcpy(header->block_name, ondisk->block_name,
523                sizeof(ondisk->block_name));
524
525         header->image_size = le64_to_cpu(ondisk->image_size);
526         header->obj_order = ondisk->options.order;
527         header->crypt_type = ondisk->options.crypt_type;
528         header->comp_type = ondisk->options.comp_type;
529
530         atomic_set(&header->snapc->nref, 1);
531         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
532         header->snapc->num_snaps = snap_count;
533         header->total_snaps = snap_count;
534
535         if (snap_count && allocated_snaps == snap_count) {
536                 for (i = 0; i < snap_count; i++) {
537                         header->snapc->snaps[i] =
538                                 le64_to_cpu(ondisk->snaps[i].id);
539                         header->snap_sizes[i] =
540                                 le64_to_cpu(ondisk->snaps[i].image_size);
541                 }
542
543                 /* copy snapshot names */
544                 memcpy(header->snap_names, &ondisk->snaps[i],
545                         header->snap_names_len);
546         }
547
548         return 0;
549
550 err_names:
551         kfree(header->snap_names);
552 err_snapc:
553         kfree(header->snapc);
554         return -ENOMEM;
555 }
556
557 static int snap_index(struct rbd_image_header *header, int snap_num)
558 {
559         return header->total_snaps - snap_num;
560 }
561
562 static u64 cur_snap_id(struct rbd_device *rbd_dev)
563 {
564         struct rbd_image_header *header = &rbd_dev->header;
565
566         if (!rbd_dev->cur_snap)
567                 return 0;
568
569         return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
570 }
571
572 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
573                         u64 *seq, u64 *size)
574 {
575         int i;
576         char *p = header->snap_names;
577
578         for (i = 0; i < header->total_snaps; i++) {
579                 if (!strcmp(snap_name, p)) {
580
581                         /* Found it.  Pass back its id and/or size */
582
583                         if (seq)
584                                 *seq = header->snapc->snaps[i];
585                         if (size)
586                                 *size = header->snap_sizes[i];
587                         return i;
588                 }
589                 p += strlen(p) + 1;     /* Skip ahead to the next name */
590         }
591         return -ENOENT;
592 }
593
594 static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
595 {
596         struct rbd_image_header *header = &dev->header;
597         struct ceph_snap_context *snapc = header->snapc;
598         int ret = -ENOENT;
599
600         BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
601
602         down_write(&dev->header_rwsem);
603
604         if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
605                     sizeof (RBD_SNAP_HEAD_NAME))) {
606                 if (header->total_snaps)
607                         snapc->seq = header->snap_seq;
608                 else
609                         snapc->seq = 0;
610                 dev->cur_snap = 0;
611                 dev->read_only = 0;
612                 if (size)
613                         *size = header->image_size;
614         } else {
615                 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
616                 if (ret < 0)
617                         goto done;
618
619                 dev->cur_snap = header->total_snaps - ret;
620                 dev->read_only = 1;
621         }
622
623         ret = 0;
624 done:
625         up_write(&dev->header_rwsem);
626         return ret;
627 }
628
629 static void rbd_header_free(struct rbd_image_header *header)
630 {
631         kfree(header->snapc);
632         kfree(header->snap_names);
633         kfree(header->snap_sizes);
634 }
635
636 /*
637  * get the actual striped segment name, offset and length
638  */
639 static u64 rbd_get_segment(struct rbd_image_header *header,
640                            const char *block_name,
641                            u64 ofs, u64 len,
642                            char *seg_name, u64 *segofs)
643 {
644         u64 seg = ofs >> header->obj_order;
645
646         if (seg_name)
647                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
648                          "%s.%012llx", block_name, seg);
649
650         ofs = ofs & ((1 << header->obj_order) - 1);
651         len = min_t(u64, len, (1 << header->obj_order) - ofs);
652
653         if (segofs)
654                 *segofs = ofs;
655
656         return len;
657 }
658
659 static int rbd_get_num_segments(struct rbd_image_header *header,
660                                 u64 ofs, u64 len)
661 {
662         u64 start_seg = ofs >> header->obj_order;
663         u64 end_seg = (ofs + len - 1) >> header->obj_order;
664         return end_seg - start_seg + 1;
665 }
666
667 /*
668  * returns the size of an object in the image
669  */
670 static u64 rbd_obj_bytes(struct rbd_image_header *header)
671 {
672         return 1 << header->obj_order;
673 }
674
675 /*
676  * bio helpers
677  */
678
679 static void bio_chain_put(struct bio *chain)
680 {
681         struct bio *tmp;
682
683         while (chain) {
684                 tmp = chain;
685                 chain = chain->bi_next;
686                 bio_put(tmp);
687         }
688 }
689
690 /*
691  * zeros a bio chain, starting at specific offset
692  */
693 static void zero_bio_chain(struct bio *chain, int start_ofs)
694 {
695         struct bio_vec *bv;
696         unsigned long flags;
697         void *buf;
698         int i;
699         int pos = 0;
700
701         while (chain) {
702                 bio_for_each_segment(bv, chain, i) {
703                         if (pos + bv->bv_len > start_ofs) {
704                                 int remainder = max(start_ofs - pos, 0);
705                                 buf = bvec_kmap_irq(bv, &flags);
706                                 memset(buf + remainder, 0,
707                                        bv->bv_len - remainder);
708                                 bvec_kunmap_irq(buf, &flags);
709                         }
710                         pos += bv->bv_len;
711                 }
712
713                 chain = chain->bi_next;
714         }
715 }
716
717 /*
718  * bio_chain_clone - clone a chain of bios up to a certain length.
719  * might return a bio_pair that will need to be released.
720  */
721 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
722                                    struct bio_pair **bp,
723                                    int len, gfp_t gfpmask)
724 {
725         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
726         int total = 0;
727
728         if (*bp) {
729                 bio_pair_release(*bp);
730                 *bp = NULL;
731         }
732
733         while (old_chain && (total < len)) {
734                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
735                 if (!tmp)
736                         goto err_out;
737
738                 if (total + old_chain->bi_size > len) {
739                         struct bio_pair *bp;
740
741                         /*
742                          * this split can only happen with a single paged bio,
743                          * split_bio will BUG_ON if this is not the case
744                          */
745                         dout("bio_chain_clone split! total=%d remaining=%d"
746                              "bi_size=%d\n",
747                              (int)total, (int)len-total,
748                              (int)old_chain->bi_size);
749
750                         /* split the bio. We'll release it either in the next
751                            call, or it will have to be released outside */
752                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
753                         if (!bp)
754                                 goto err_out;
755
756                         __bio_clone(tmp, &bp->bio1);
757
758                         *next = &bp->bio2;
759                 } else {
760                         __bio_clone(tmp, old_chain);
761                         *next = old_chain->bi_next;
762                 }
763
764                 tmp->bi_bdev = NULL;
765                 gfpmask &= ~__GFP_WAIT;
766                 tmp->bi_next = NULL;
767
768                 if (!new_chain) {
769                         new_chain = tail = tmp;
770                 } else {
771                         tail->bi_next = tmp;
772                         tail = tmp;
773                 }
774                 old_chain = old_chain->bi_next;
775
776                 total += tmp->bi_size;
777         }
778
779         BUG_ON(total < len);
780
781         if (tail)
782                 tail->bi_next = NULL;
783
784         *old = old_chain;
785
786         return new_chain;
787
788 err_out:
789         dout("bio_chain_clone with err\n");
790         bio_chain_put(new_chain);
791         return NULL;
792 }
793
794 /*
795  * helpers for osd request op vectors.
796  */
797 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
798                             int num_ops,
799                             int opcode,
800                             u32 payload_len)
801 {
802         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
803                        GFP_NOIO);
804         if (!*ops)
805                 return -ENOMEM;
806         (*ops)[0].op = opcode;
807         /*
808          * op extent offset and length will be set later on
809          * in calc_raw_layout()
810          */
811         (*ops)[0].payload_len = payload_len;
812         return 0;
813 }
814
815 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
816 {
817         kfree(ops);
818 }
819
820 static void rbd_coll_end_req_index(struct request *rq,
821                                    struct rbd_req_coll *coll,
822                                    int index,
823                                    int ret, u64 len)
824 {
825         struct request_queue *q;
826         int min, max, i;
827
828         dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
829              coll, index, ret, len);
830
831         if (!rq)
832                 return;
833
834         if (!coll) {
835                 blk_end_request(rq, ret, len);
836                 return;
837         }
838
839         q = rq->q;
840
841         spin_lock_irq(q->queue_lock);
842         coll->status[index].done = 1;
843         coll->status[index].rc = ret;
844         coll->status[index].bytes = len;
845         max = min = coll->num_done;
846         while (max < coll->total && coll->status[max].done)
847                 max++;
848
849         for (i = min; i<max; i++) {
850                 __blk_end_request(rq, coll->status[i].rc,
851                                   coll->status[i].bytes);
852                 coll->num_done++;
853                 kref_put(&coll->kref, rbd_coll_release);
854         }
855         spin_unlock_irq(q->queue_lock);
856 }
857
858 static void rbd_coll_end_req(struct rbd_request *req,
859                              int ret, u64 len)
860 {
861         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
862 }
863
864 /*
865  * Send ceph osd request
866  */
867 static int rbd_do_request(struct request *rq,
868                           struct rbd_device *dev,
869                           struct ceph_snap_context *snapc,
870                           u64 snapid,
871                           const char *obj, u64 ofs, u64 len,
872                           struct bio *bio,
873                           struct page **pages,
874                           int num_pages,
875                           int flags,
876                           struct ceph_osd_req_op *ops,
877                           int num_reply,
878                           struct rbd_req_coll *coll,
879                           int coll_index,
880                           void (*rbd_cb)(struct ceph_osd_request *req,
881                                          struct ceph_msg *msg),
882                           struct ceph_osd_request **linger_req,
883                           u64 *ver)
884 {
885         struct ceph_osd_request *req;
886         struct ceph_file_layout *layout;
887         int ret;
888         u64 bno;
889         struct timespec mtime = CURRENT_TIME;
890         struct rbd_request *req_data;
891         struct ceph_osd_request_head *reqhead;
892         struct ceph_osd_client *osdc;
893
894         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
895         if (!req_data) {
896                 if (coll)
897                         rbd_coll_end_req_index(rq, coll, coll_index,
898                                                -ENOMEM, len);
899                 return -ENOMEM;
900         }
901
902         if (coll) {
903                 req_data->coll = coll;
904                 req_data->coll_index = coll_index;
905         }
906
907         dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
908
909         down_read(&dev->header_rwsem);
910
911         osdc = &dev->rbd_client->client->osdc;
912         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
913                                         false, GFP_NOIO, pages, bio);
914         if (!req) {
915                 up_read(&dev->header_rwsem);
916                 ret = -ENOMEM;
917                 goto done_pages;
918         }
919
920         req->r_callback = rbd_cb;
921
922         req_data->rq = rq;
923         req_data->bio = bio;
924         req_data->pages = pages;
925         req_data->len = len;
926
927         req->r_priv = req_data;
928
929         reqhead = req->r_request->front.iov_base;
930         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
931
932         strncpy(req->r_oid, obj, sizeof(req->r_oid));
933         req->r_oid_len = strlen(req->r_oid);
934
935         layout = &req->r_file_layout;
936         memset(layout, 0, sizeof(*layout));
937         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
938         layout->fl_stripe_count = cpu_to_le32(1);
939         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
940         layout->fl_pg_pool = cpu_to_le32(dev->poolid);
941         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
942                                 req, ops);
943
944         ceph_osdc_build_request(req, ofs, &len,
945                                 ops,
946                                 snapc,
947                                 &mtime,
948                                 req->r_oid, req->r_oid_len);
949         up_read(&dev->header_rwsem);
950
951         if (linger_req) {
952                 ceph_osdc_set_request_linger(osdc, req);
953                 *linger_req = req;
954         }
955
956         ret = ceph_osdc_start_request(osdc, req, false);
957         if (ret < 0)
958                 goto done_err;
959
960         if (!rbd_cb) {
961                 ret = ceph_osdc_wait_request(osdc, req);
962                 if (ver)
963                         *ver = le64_to_cpu(req->r_reassert_version.version);
964                 dout("reassert_ver=%lld\n",
965                      le64_to_cpu(req->r_reassert_version.version));
966                 ceph_osdc_put_request(req);
967         }
968         return ret;
969
970 done_err:
971         bio_chain_put(req_data->bio);
972         ceph_osdc_put_request(req);
973 done_pages:
974         rbd_coll_end_req(req_data, ret, len);
975         kfree(req_data);
976         return ret;
977 }
978
979 /*
980  * Ceph osd op callback
981  */
982 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
983 {
984         struct rbd_request *req_data = req->r_priv;
985         struct ceph_osd_reply_head *replyhead;
986         struct ceph_osd_op *op;
987         __s32 rc;
988         u64 bytes;
989         int read_op;
990
991         /* parse reply */
992         replyhead = msg->front.iov_base;
993         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
994         op = (void *)(replyhead + 1);
995         rc = le32_to_cpu(replyhead->result);
996         bytes = le64_to_cpu(op->extent.length);
997         read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
998
999         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
1000
1001         if (rc == -ENOENT && read_op) {
1002                 zero_bio_chain(req_data->bio, 0);
1003                 rc = 0;
1004         } else if (rc == 0 && read_op && bytes < req_data->len) {
1005                 zero_bio_chain(req_data->bio, bytes);
1006                 bytes = req_data->len;
1007         }
1008
1009         rbd_coll_end_req(req_data, rc, bytes);
1010
1011         if (req_data->bio)
1012                 bio_chain_put(req_data->bio);
1013
1014         ceph_osdc_put_request(req);
1015         kfree(req_data);
1016 }
1017
1018 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1019 {
1020         ceph_osdc_put_request(req);
1021 }
1022
1023 /*
1024  * Do a synchronous ceph osd operation
1025  */
1026 static int rbd_req_sync_op(struct rbd_device *dev,
1027                            struct ceph_snap_context *snapc,
1028                            u64 snapid,
1029                            int opcode,
1030                            int flags,
1031                            struct ceph_osd_req_op *orig_ops,
1032                            int num_reply,
1033                            const char *obj,
1034                            u64 ofs, u64 len,
1035                            char *buf,
1036                            struct ceph_osd_request **linger_req,
1037                            u64 *ver)
1038 {
1039         int ret;
1040         struct page **pages;
1041         int num_pages;
1042         struct ceph_osd_req_op *ops = orig_ops;
1043         u32 payload_len;
1044
1045         num_pages = calc_pages_for(ofs , len);
1046         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1047         if (IS_ERR(pages))
1048                 return PTR_ERR(pages);
1049
1050         if (!orig_ops) {
1051                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1052                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1053                 if (ret < 0)
1054                         goto done;
1055
1056                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1057                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1058                         if (ret < 0)
1059                                 goto done_ops;
1060                 }
1061         }
1062
1063         ret = rbd_do_request(NULL, dev, snapc, snapid,
1064                           obj, ofs, len, NULL,
1065                           pages, num_pages,
1066                           flags,
1067                           ops,
1068                           2,
1069                           NULL, 0,
1070                           NULL,
1071                           linger_req, ver);
1072         if (ret < 0)
1073                 goto done_ops;
1074
1075         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1076                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1077
1078 done_ops:
1079         if (!orig_ops)
1080                 rbd_destroy_ops(ops);
1081 done:
1082         ceph_release_page_vector(pages, num_pages);
1083         return ret;
1084 }
1085
1086 /*
1087  * Do an asynchronous ceph osd operation
1088  */
1089 static int rbd_do_op(struct request *rq,
1090                      struct rbd_device *rbd_dev ,
1091                      struct ceph_snap_context *snapc,
1092                      u64 snapid,
1093                      int opcode, int flags, int num_reply,
1094                      u64 ofs, u64 len,
1095                      struct bio *bio,
1096                      struct rbd_req_coll *coll,
1097                      int coll_index)
1098 {
1099         char *seg_name;
1100         u64 seg_ofs;
1101         u64 seg_len;
1102         int ret;
1103         struct ceph_osd_req_op *ops;
1104         u32 payload_len;
1105
1106         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1107         if (!seg_name)
1108                 return -ENOMEM;
1109
1110         seg_len = rbd_get_segment(&rbd_dev->header,
1111                                   rbd_dev->header.block_name,
1112                                   ofs, len,
1113                                   seg_name, &seg_ofs);
1114
1115         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1116
1117         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1118         if (ret < 0)
1119                 goto done;
1120
1121         /* we've taken care of segment sizes earlier when we
1122            cloned the bios. We should never have a segment
1123            truncated at this point */
1124         BUG_ON(seg_len < len);
1125
1126         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1127                              seg_name, seg_ofs, seg_len,
1128                              bio,
1129                              NULL, 0,
1130                              flags,
1131                              ops,
1132                              num_reply,
1133                              coll, coll_index,
1134                              rbd_req_cb, 0, NULL);
1135
1136         rbd_destroy_ops(ops);
1137 done:
1138         kfree(seg_name);
1139         return ret;
1140 }
1141
1142 /*
1143  * Request async osd write
1144  */
1145 static int rbd_req_write(struct request *rq,
1146                          struct rbd_device *rbd_dev,
1147                          struct ceph_snap_context *snapc,
1148                          u64 ofs, u64 len,
1149                          struct bio *bio,
1150                          struct rbd_req_coll *coll,
1151                          int coll_index)
1152 {
1153         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1154                          CEPH_OSD_OP_WRITE,
1155                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1156                          2,
1157                          ofs, len, bio, coll, coll_index);
1158 }
1159
1160 /*
1161  * Request async osd read
1162  */
1163 static int rbd_req_read(struct request *rq,
1164                          struct rbd_device *rbd_dev,
1165                          u64 snapid,
1166                          u64 ofs, u64 len,
1167                          struct bio *bio,
1168                          struct rbd_req_coll *coll,
1169                          int coll_index)
1170 {
1171         return rbd_do_op(rq, rbd_dev, NULL,
1172                          (snapid ? snapid : CEPH_NOSNAP),
1173                          CEPH_OSD_OP_READ,
1174                          CEPH_OSD_FLAG_READ,
1175                          2,
1176                          ofs, len, bio, coll, coll_index);
1177 }
1178
1179 /*
1180  * Request sync osd read
1181  */
1182 static int rbd_req_sync_read(struct rbd_device *dev,
1183                           struct ceph_snap_context *snapc,
1184                           u64 snapid,
1185                           const char *obj,
1186                           u64 ofs, u64 len,
1187                           char *buf,
1188                           u64 *ver)
1189 {
1190         return rbd_req_sync_op(dev, NULL,
1191                                (snapid ? snapid : CEPH_NOSNAP),
1192                                CEPH_OSD_OP_READ,
1193                                CEPH_OSD_FLAG_READ,
1194                                NULL,
1195                                1, obj, ofs, len, buf, NULL, ver);
1196 }
1197
1198 /*
1199  * Request sync osd watch
1200  */
1201 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1202                                    u64 ver,
1203                                    u64 notify_id,
1204                                    const char *obj)
1205 {
1206         struct ceph_osd_req_op *ops;
1207         struct page **pages = NULL;
1208         int ret;
1209
1210         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1211         if (ret < 0)
1212                 return ret;
1213
1214         ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1215         ops[0].watch.cookie = notify_id;
1216         ops[0].watch.flag = 0;
1217
1218         ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1219                           obj, 0, 0, NULL,
1220                           pages, 0,
1221                           CEPH_OSD_FLAG_READ,
1222                           ops,
1223                           1,
1224                           NULL, 0,
1225                           rbd_simple_req_cb, 0, NULL);
1226
1227         rbd_destroy_ops(ops);
1228         return ret;
1229 }
1230
1231 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1232 {
1233         struct rbd_device *dev = (struct rbd_device *)data;
1234         int rc;
1235
1236         if (!dev)
1237                 return;
1238
1239         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1240                 notify_id, (int)opcode);
1241         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1242         rc = __rbd_update_snaps(dev);
1243         mutex_unlock(&ctl_mutex);
1244         if (rc)
1245                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1246                            " update snaps: %d\n", dev->major, rc);
1247
1248         rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1249 }
1250
1251 /*
1252  * Request sync osd watch
1253  */
1254 static int rbd_req_sync_watch(struct rbd_device *dev,
1255                               const char *obj,
1256                               u64 ver)
1257 {
1258         struct ceph_osd_req_op *ops;
1259         struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1260
1261         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1262         if (ret < 0)
1263                 return ret;
1264
1265         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1266                                      (void *)dev, &dev->watch_event);
1267         if (ret < 0)
1268                 goto fail;
1269
1270         ops[0].watch.ver = cpu_to_le64(ver);
1271         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1272         ops[0].watch.flag = 1;
1273
1274         ret = rbd_req_sync_op(dev, NULL,
1275                               CEPH_NOSNAP,
1276                               0,
1277                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1278                               ops,
1279                               1, obj, 0, 0, NULL,
1280                               &dev->watch_request, NULL);
1281
1282         if (ret < 0)
1283                 goto fail_event;
1284
1285         rbd_destroy_ops(ops);
1286         return 0;
1287
1288 fail_event:
1289         ceph_osdc_cancel_event(dev->watch_event);
1290         dev->watch_event = NULL;
1291 fail:
1292         rbd_destroy_ops(ops);
1293         return ret;
1294 }
1295
1296 /*
1297  * Request sync osd unwatch
1298  */
1299 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1300                                 const char *obj)
1301 {
1302         struct ceph_osd_req_op *ops;
1303
1304         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1305         if (ret < 0)
1306                 return ret;
1307
1308         ops[0].watch.ver = 0;
1309         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1310         ops[0].watch.flag = 0;
1311
1312         ret = rbd_req_sync_op(dev, NULL,
1313                               CEPH_NOSNAP,
1314                               0,
1315                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1316                               ops,
1317                               1, obj, 0, 0, NULL, NULL, NULL);
1318
1319         rbd_destroy_ops(ops);
1320         ceph_osdc_cancel_event(dev->watch_event);
1321         dev->watch_event = NULL;
1322         return ret;
1323 }
1324
1325 struct rbd_notify_info {
1326         struct rbd_device *dev;
1327 };
1328
1329 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1330 {
1331         struct rbd_device *dev = (struct rbd_device *)data;
1332         if (!dev)
1333                 return;
1334
1335         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1336                 notify_id, (int)opcode);
1337 }
1338
1339 /*
1340  * Request sync osd notify
1341  */
1342 static int rbd_req_sync_notify(struct rbd_device *dev,
1343                           const char *obj)
1344 {
1345         struct ceph_osd_req_op *ops;
1346         struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1347         struct ceph_osd_event *event;
1348         struct rbd_notify_info info;
1349         int payload_len = sizeof(u32) + sizeof(u32);
1350         int ret;
1351
1352         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1353         if (ret < 0)
1354                 return ret;
1355
1356         info.dev = dev;
1357
1358         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1359                                      (void *)&info, &event);
1360         if (ret < 0)
1361                 goto fail;
1362
1363         ops[0].watch.ver = 1;
1364         ops[0].watch.flag = 1;
1365         ops[0].watch.cookie = event->cookie;
1366         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1367         ops[0].watch.timeout = 12;
1368
1369         ret = rbd_req_sync_op(dev, NULL,
1370                                CEPH_NOSNAP,
1371                                0,
1372                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1373                                ops,
1374                                1, obj, 0, 0, NULL, NULL, NULL);
1375         if (ret < 0)
1376                 goto fail_event;
1377
1378         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1379         dout("ceph_osdc_wait_event returned %d\n", ret);
1380         rbd_destroy_ops(ops);
1381         return 0;
1382
1383 fail_event:
1384         ceph_osdc_cancel_event(event);
1385 fail:
1386         rbd_destroy_ops(ops);
1387         return ret;
1388 }
1389
1390 /*
1391  * Request sync osd read
1392  */
1393 static int rbd_req_sync_exec(struct rbd_device *dev,
1394                              const char *obj,
1395                              const char *cls,
1396                              const char *method,
1397                              const char *data,
1398                              int len,
1399                              u64 *ver)
1400 {
1401         struct ceph_osd_req_op *ops;
1402         int cls_len = strlen(cls);
1403         int method_len = strlen(method);
1404         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1405                                     cls_len + method_len + len);
1406         if (ret < 0)
1407                 return ret;
1408
1409         ops[0].cls.class_name = cls;
1410         ops[0].cls.class_len = (__u8)cls_len;
1411         ops[0].cls.method_name = method;
1412         ops[0].cls.method_len = (__u8)method_len;
1413         ops[0].cls.argc = 0;
1414         ops[0].cls.indata = data;
1415         ops[0].cls.indata_len = len;
1416
1417         ret = rbd_req_sync_op(dev, NULL,
1418                                CEPH_NOSNAP,
1419                                0,
1420                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1421                                ops,
1422                                1, obj, 0, 0, NULL, NULL, ver);
1423
1424         rbd_destroy_ops(ops);
1425
1426         dout("cls_exec returned %d\n", ret);
1427         return ret;
1428 }
1429
1430 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1431 {
1432         struct rbd_req_coll *coll =
1433                         kzalloc(sizeof(struct rbd_req_coll) +
1434                                 sizeof(struct rbd_req_status) * num_reqs,
1435                                 GFP_ATOMIC);
1436
1437         if (!coll)
1438                 return NULL;
1439         coll->total = num_reqs;
1440         kref_init(&coll->kref);
1441         return coll;
1442 }
1443
1444 /*
1445  * block device queue callback
1446  */
1447 static void rbd_rq_fn(struct request_queue *q)
1448 {
1449         struct rbd_device *rbd_dev = q->queuedata;
1450         struct request *rq;
1451         struct bio_pair *bp = NULL;
1452
1453         while ((rq = blk_fetch_request(q))) {
1454                 struct bio *bio;
1455                 struct bio *rq_bio, *next_bio = NULL;
1456                 bool do_write;
1457                 int size, op_size = 0;
1458                 u64 ofs;
1459                 int num_segs, cur_seg = 0;
1460                 struct rbd_req_coll *coll;
1461
1462                 /* peek at request from block layer */
1463                 if (!rq)
1464                         break;
1465
1466                 dout("fetched request\n");
1467
1468                 /* filter out block requests we don't understand */
1469                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1470                         __blk_end_request_all(rq, 0);
1471                         continue;
1472                 }
1473
1474                 /* deduce our operation (read, write) */
1475                 do_write = (rq_data_dir(rq) == WRITE);
1476
1477                 size = blk_rq_bytes(rq);
1478                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1479                 rq_bio = rq->bio;
1480                 if (do_write && rbd_dev->read_only) {
1481                         __blk_end_request_all(rq, -EROFS);
1482                         continue;
1483                 }
1484
1485                 spin_unlock_irq(q->queue_lock);
1486
1487                 dout("%s 0x%x bytes at 0x%llx\n",
1488                      do_write ? "write" : "read",
1489                      size, blk_rq_pos(rq) * SECTOR_SIZE);
1490
1491                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1492                 coll = rbd_alloc_coll(num_segs);
1493                 if (!coll) {
1494                         spin_lock_irq(q->queue_lock);
1495                         __blk_end_request_all(rq, -ENOMEM);
1496                         continue;
1497                 }
1498
1499                 do {
1500                         /* a bio clone to be passed down to OSD req */
1501                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1502                         op_size = rbd_get_segment(&rbd_dev->header,
1503                                                   rbd_dev->header.block_name,
1504                                                   ofs, size,
1505                                                   NULL, NULL);
1506                         kref_get(&coll->kref);
1507                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1508                                               op_size, GFP_ATOMIC);
1509                         if (!bio) {
1510                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1511                                                        -ENOMEM, op_size);
1512                                 goto next_seg;
1513                         }
1514
1515
1516                         /* init OSD command: write or read */
1517                         if (do_write)
1518                                 rbd_req_write(rq, rbd_dev,
1519                                               rbd_dev->header.snapc,
1520                                               ofs,
1521                                               op_size, bio,
1522                                               coll, cur_seg);
1523                         else
1524                                 rbd_req_read(rq, rbd_dev,
1525                                              cur_snap_id(rbd_dev),
1526                                              ofs,
1527                                              op_size, bio,
1528                                              coll, cur_seg);
1529
1530 next_seg:
1531                         size -= op_size;
1532                         ofs += op_size;
1533
1534                         cur_seg++;
1535                         rq_bio = next_bio;
1536                 } while (size > 0);
1537                 kref_put(&coll->kref, rbd_coll_release);
1538
1539                 if (bp)
1540                         bio_pair_release(bp);
1541                 spin_lock_irq(q->queue_lock);
1542         }
1543 }
1544
1545 /*
1546  * a queue callback. Makes sure that we don't create a bio that spans across
1547  * multiple osd objects. One exception would be with a single page bios,
1548  * which we handle later at bio_chain_clone
1549  */
1550 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1551                           struct bio_vec *bvec)
1552 {
1553         struct rbd_device *rbd_dev = q->queuedata;
1554         unsigned int chunk_sectors;
1555         sector_t sector;
1556         unsigned int bio_sectors;
1557         int max;
1558
1559         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1560         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1561         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1562
1563         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1564                                  + bio_sectors)) << SECTOR_SHIFT;
1565         if (max < 0)
1566                 max = 0; /* bio_add cannot handle a negative return */
1567         if (max <= bvec->bv_len && bio_sectors == 0)
1568                 return bvec->bv_len;
1569         return max;
1570 }
1571
1572 static void rbd_free_disk(struct rbd_device *rbd_dev)
1573 {
1574         struct gendisk *disk = rbd_dev->disk;
1575
1576         if (!disk)
1577                 return;
1578
1579         rbd_header_free(&rbd_dev->header);
1580
1581         if (disk->flags & GENHD_FL_UP)
1582                 del_gendisk(disk);
1583         if (disk->queue)
1584                 blk_cleanup_queue(disk->queue);
1585         put_disk(disk);
1586 }
1587
1588 /*
1589  * reload the ondisk the header 
1590  */
1591 static int rbd_read_header(struct rbd_device *rbd_dev,
1592                            struct rbd_image_header *header)
1593 {
1594         ssize_t rc;
1595         struct rbd_image_header_ondisk *dh;
1596         u32 snap_count = 0;
1597         u64 ver;
1598         size_t len;
1599
1600         /*
1601          * First reads the fixed-size header to determine the number
1602          * of snapshots, then re-reads it, along with all snapshot
1603          * records as well as their stored names.
1604          */
1605         len = sizeof (*dh);
1606         while (1) {
1607                 dh = kmalloc(len, GFP_KERNEL);
1608                 if (!dh)
1609                         return -ENOMEM;
1610
1611                 rc = rbd_req_sync_read(rbd_dev,
1612                                        NULL, CEPH_NOSNAP,
1613                                        rbd_dev->obj_md_name,
1614                                        0, len,
1615                                        (char *)dh, &ver);
1616                 if (rc < 0)
1617                         goto out_dh;
1618
1619                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1620                 if (rc < 0) {
1621                         if (rc == -ENXIO)
1622                                 pr_warning("unrecognized header format"
1623                                            " for image %s", rbd_dev->obj);
1624                         goto out_dh;
1625                 }
1626
1627                 if (snap_count == header->total_snaps)
1628                         break;
1629
1630                 snap_count = header->total_snaps;
1631                 len = sizeof (*dh) +
1632                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1633                         header->snap_names_len;
1634
1635                 rbd_header_free(header);
1636                 kfree(dh);
1637         }
1638         header->obj_version = ver;
1639
1640 out_dh:
1641         kfree(dh);
1642         return rc;
1643 }
1644
1645 /*
1646  * create a snapshot
1647  */
1648 static int rbd_header_add_snap(struct rbd_device *dev,
1649                                const char *snap_name,
1650                                gfp_t gfp_flags)
1651 {
1652         int name_len = strlen(snap_name);
1653         u64 new_snapid;
1654         int ret;
1655         void *data, *p, *e;
1656         u64 ver;
1657         struct ceph_mon_client *monc;
1658
1659         /* we should create a snapshot only if we're pointing at the head */
1660         if (dev->cur_snap)
1661                 return -EINVAL;
1662
1663         monc = &dev->rbd_client->client->monc;
1664         ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1665         dout("created snapid=%lld\n", new_snapid);
1666         if (ret < 0)
1667                 return ret;
1668
1669         data = kmalloc(name_len + 16, gfp_flags);
1670         if (!data)
1671                 return -ENOMEM;
1672
1673         p = data;
1674         e = data + name_len + 16;
1675
1676         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1677         ceph_encode_64_safe(&p, e, new_snapid, bad);
1678
1679         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1680                                 data, p - data, &ver);
1681
1682         kfree(data);
1683
1684         if (ret < 0)
1685                 return ret;
1686
1687         dev->header.snapc->seq =  new_snapid;
1688
1689         return 0;
1690 bad:
1691         return -ERANGE;
1692 }
1693
1694 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1695 {
1696         struct rbd_snap *snap;
1697
1698         while (!list_empty(&rbd_dev->snaps)) {
1699                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1700                 __rbd_remove_snap_dev(rbd_dev, snap);
1701         }
1702 }
1703
1704 /*
1705  * only read the first part of the ondisk header, without the snaps info
1706  */
1707 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1708 {
1709         int ret;
1710         struct rbd_image_header h;
1711         u64 snap_seq;
1712         int follow_seq = 0;
1713
1714         ret = rbd_read_header(rbd_dev, &h);
1715         if (ret < 0)
1716                 return ret;
1717
1718         /* resized? */
1719         set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1720
1721         down_write(&rbd_dev->header_rwsem);
1722
1723         snap_seq = rbd_dev->header.snapc->seq;
1724         if (rbd_dev->header.total_snaps &&
1725             rbd_dev->header.snapc->snaps[0] == snap_seq)
1726                 /* pointing at the head, will need to follow that
1727                    if head moves */
1728                 follow_seq = 1;
1729
1730         kfree(rbd_dev->header.snapc);
1731         kfree(rbd_dev->header.snap_names);
1732         kfree(rbd_dev->header.snap_sizes);
1733
1734         rbd_dev->header.total_snaps = h.total_snaps;
1735         rbd_dev->header.snapc = h.snapc;
1736         rbd_dev->header.snap_names = h.snap_names;
1737         rbd_dev->header.snap_names_len = h.snap_names_len;
1738         rbd_dev->header.snap_sizes = h.snap_sizes;
1739         if (follow_seq)
1740                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1741         else
1742                 rbd_dev->header.snapc->seq = snap_seq;
1743
1744         ret = __rbd_init_snaps_header(rbd_dev);
1745
1746         up_write(&rbd_dev->header_rwsem);
1747
1748         return ret;
1749 }
1750
1751 static int rbd_init_disk(struct rbd_device *rbd_dev)
1752 {
1753         struct gendisk *disk;
1754         struct request_queue *q;
1755         int rc;
1756         u64 segment_size;
1757         u64 total_size = 0;
1758
1759         /* contact OSD, request size info about the object being mapped */
1760         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1761         if (rc)
1762                 return rc;
1763
1764         /* no need to lock here, as rbd_dev is not registered yet */
1765         rc = __rbd_init_snaps_header(rbd_dev);
1766         if (rc)
1767                 return rc;
1768
1769         rc = rbd_header_set_snap(rbd_dev, &total_size);
1770         if (rc)
1771                 return rc;
1772
1773         /* create gendisk info */
1774         rc = -ENOMEM;
1775         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1776         if (!disk)
1777                 goto out;
1778
1779         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1780                  rbd_dev->id);
1781         disk->major = rbd_dev->major;
1782         disk->first_minor = 0;
1783         disk->fops = &rbd_bd_ops;
1784         disk->private_data = rbd_dev;
1785
1786         /* init rq */
1787         rc = -ENOMEM;
1788         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1789         if (!q)
1790                 goto out_disk;
1791
1792         /* We use the default size, but let's be explicit about it. */
1793         blk_queue_physical_block_size(q, SECTOR_SIZE);
1794
1795         /* set io sizes to object size */
1796         segment_size = rbd_obj_bytes(&rbd_dev->header);
1797         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1798         blk_queue_max_segment_size(q, segment_size);
1799         blk_queue_io_min(q, segment_size);
1800         blk_queue_io_opt(q, segment_size);
1801
1802         blk_queue_merge_bvec(q, rbd_merge_bvec);
1803         disk->queue = q;
1804
1805         q->queuedata = rbd_dev;
1806
1807         rbd_dev->disk = disk;
1808         rbd_dev->q = q;
1809
1810         /* finally, announce the disk to the world */
1811         set_capacity(disk, total_size / SECTOR_SIZE);
1812         add_disk(disk);
1813
1814         pr_info("%s: added with size 0x%llx\n",
1815                 disk->disk_name, (unsigned long long)total_size);
1816         return 0;
1817
1818 out_disk:
1819         put_disk(disk);
1820 out:
1821         return rc;
1822 }
1823
1824 /*
1825   sysfs
1826 */
1827
1828 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1829 {
1830         return container_of(dev, struct rbd_device, dev);
1831 }
1832
1833 static ssize_t rbd_size_show(struct device *dev,
1834                              struct device_attribute *attr, char *buf)
1835 {
1836         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1837
1838         return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1839 }
1840
1841 static ssize_t rbd_major_show(struct device *dev,
1842                               struct device_attribute *attr, char *buf)
1843 {
1844         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1845
1846         return sprintf(buf, "%d\n", rbd_dev->major);
1847 }
1848
1849 static ssize_t rbd_client_id_show(struct device *dev,
1850                                   struct device_attribute *attr, char *buf)
1851 {
1852         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1853
1854         return sprintf(buf, "client%lld\n",
1855                         ceph_client_id(rbd_dev->rbd_client->client));
1856 }
1857
1858 static ssize_t rbd_pool_show(struct device *dev,
1859                              struct device_attribute *attr, char *buf)
1860 {
1861         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1862
1863         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1864 }
1865
1866 static ssize_t rbd_name_show(struct device *dev,
1867                              struct device_attribute *attr, char *buf)
1868 {
1869         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1870
1871         return sprintf(buf, "%s\n", rbd_dev->obj);
1872 }
1873
1874 static ssize_t rbd_snap_show(struct device *dev,
1875                              struct device_attribute *attr,
1876                              char *buf)
1877 {
1878         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1879
1880         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1881 }
1882
1883 static ssize_t rbd_image_refresh(struct device *dev,
1884                                  struct device_attribute *attr,
1885                                  const char *buf,
1886                                  size_t size)
1887 {
1888         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1889         int rc;
1890         int ret = size;
1891
1892         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1893
1894         rc = __rbd_update_snaps(rbd_dev);
1895         if (rc < 0)
1896                 ret = rc;
1897
1898         mutex_unlock(&ctl_mutex);
1899         return ret;
1900 }
1901
1902 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1903 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1904 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1905 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1906 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1907 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1908 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1909 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1910
1911 static struct attribute *rbd_attrs[] = {
1912         &dev_attr_size.attr,
1913         &dev_attr_major.attr,
1914         &dev_attr_client_id.attr,
1915         &dev_attr_pool.attr,
1916         &dev_attr_name.attr,
1917         &dev_attr_current_snap.attr,
1918         &dev_attr_refresh.attr,
1919         &dev_attr_create_snap.attr,
1920         NULL
1921 };
1922
1923 static struct attribute_group rbd_attr_group = {
1924         .attrs = rbd_attrs,
1925 };
1926
1927 static const struct attribute_group *rbd_attr_groups[] = {
1928         &rbd_attr_group,
1929         NULL
1930 };
1931
1932 static void rbd_sysfs_dev_release(struct device *dev)
1933 {
1934 }
1935
1936 static struct device_type rbd_device_type = {
1937         .name           = "rbd",
1938         .groups         = rbd_attr_groups,
1939         .release        = rbd_sysfs_dev_release,
1940 };
1941
1942
1943 /*
1944   sysfs - snapshots
1945 */
1946
1947 static ssize_t rbd_snap_size_show(struct device *dev,
1948                                   struct device_attribute *attr,
1949                                   char *buf)
1950 {
1951         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1952
1953         return sprintf(buf, "%zd\n", snap->size);
1954 }
1955
1956 static ssize_t rbd_snap_id_show(struct device *dev,
1957                                 struct device_attribute *attr,
1958                                 char *buf)
1959 {
1960         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1961
1962         return sprintf(buf, "%llu\n", (unsigned long long) snap->id);
1963 }
1964
1965 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1966 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1967
1968 static struct attribute *rbd_snap_attrs[] = {
1969         &dev_attr_snap_size.attr,
1970         &dev_attr_snap_id.attr,
1971         NULL,
1972 };
1973
1974 static struct attribute_group rbd_snap_attr_group = {
1975         .attrs = rbd_snap_attrs,
1976 };
1977
1978 static void rbd_snap_dev_release(struct device *dev)
1979 {
1980         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1981         kfree(snap->name);
1982         kfree(snap);
1983 }
1984
1985 static const struct attribute_group *rbd_snap_attr_groups[] = {
1986         &rbd_snap_attr_group,
1987         NULL
1988 };
1989
1990 static struct device_type rbd_snap_device_type = {
1991         .groups         = rbd_snap_attr_groups,
1992         .release        = rbd_snap_dev_release,
1993 };
1994
1995 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1996                                   struct rbd_snap *snap)
1997 {
1998         list_del(&snap->node);
1999         device_unregister(&snap->dev);
2000 }
2001
2002 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2003                                   struct rbd_snap *snap,
2004                                   struct device *parent)
2005 {
2006         struct device *dev = &snap->dev;
2007         int ret;
2008
2009         dev->type = &rbd_snap_device_type;
2010         dev->parent = parent;
2011         dev->release = rbd_snap_dev_release;
2012         dev_set_name(dev, "snap_%s", snap->name);
2013         ret = device_register(dev);
2014
2015         return ret;
2016 }
2017
2018 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2019                               int i, const char *name,
2020                               struct rbd_snap **snapp)
2021 {
2022         int ret;
2023         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2024         if (!snap)
2025                 return -ENOMEM;
2026         snap->name = kstrdup(name, GFP_KERNEL);
2027         snap->size = rbd_dev->header.snap_sizes[i];
2028         snap->id = rbd_dev->header.snapc->snaps[i];
2029         if (device_is_registered(&rbd_dev->dev)) {
2030                 ret = rbd_register_snap_dev(rbd_dev, snap,
2031                                              &rbd_dev->dev);
2032                 if (ret < 0)
2033                         goto err;
2034         }
2035         *snapp = snap;
2036         return 0;
2037 err:
2038         kfree(snap->name);
2039         kfree(snap);
2040         return ret;
2041 }
2042
2043 /*
2044  * search for the previous snap in a null delimited string list
2045  */
2046 const char *rbd_prev_snap_name(const char *name, const char *start)
2047 {
2048         if (name < start + 2)
2049                 return NULL;
2050
2051         name -= 2;
2052         while (*name) {
2053                 if (name == start)
2054                         return start;
2055                 name--;
2056         }
2057         return name + 1;
2058 }
2059
2060 /*
2061  * compare the old list of snapshots that we have to what's in the header
2062  * and update it accordingly. Note that the header holds the snapshots
2063  * in a reverse order (from newest to oldest) and we need to go from
2064  * older to new so that we don't get a duplicate snap name when
2065  * doing the process (e.g., removed snapshot and recreated a new
2066  * one with the same name.
2067  */
2068 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2069 {
2070         const char *name, *first_name;
2071         int i = rbd_dev->header.total_snaps;
2072         struct rbd_snap *snap, *old_snap = NULL;
2073         int ret;
2074         struct list_head *p, *n;
2075
2076         first_name = rbd_dev->header.snap_names;
2077         name = first_name + rbd_dev->header.snap_names_len;
2078
2079         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2080                 u64 cur_id;
2081
2082                 old_snap = list_entry(p, struct rbd_snap, node);
2083
2084                 if (i)
2085                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2086
2087                 if (!i || old_snap->id < cur_id) {
2088                         /* old_snap->id was skipped, thus was removed */
2089                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2090                         continue;
2091                 }
2092                 if (old_snap->id == cur_id) {
2093                         /* we have this snapshot already */
2094                         i--;
2095                         name = rbd_prev_snap_name(name, first_name);
2096                         continue;
2097                 }
2098                 for (; i > 0;
2099                      i--, name = rbd_prev_snap_name(name, first_name)) {
2100                         if (!name) {
2101                                 WARN_ON(1);
2102                                 return -EINVAL;
2103                         }
2104                         cur_id = rbd_dev->header.snapc->snaps[i];
2105                         /* snapshot removal? handle it above */
2106                         if (cur_id >= old_snap->id)
2107                                 break;
2108                         /* a new snapshot */
2109                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2110                         if (ret < 0)
2111                                 return ret;
2112
2113                         /* note that we add it backward so using n and not p */
2114                         list_add(&snap->node, n);
2115                         p = &snap->node;
2116                 }
2117         }
2118         /* we're done going over the old snap list, just add what's left */
2119         for (; i > 0; i--) {
2120                 name = rbd_prev_snap_name(name, first_name);
2121                 if (!name) {
2122                         WARN_ON(1);
2123                         return -EINVAL;
2124                 }
2125                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2126                 if (ret < 0)
2127                         return ret;
2128                 list_add(&snap->node, &rbd_dev->snaps);
2129         }
2130
2131         return 0;
2132 }
2133
2134 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2135 {
2136         int ret;
2137         struct device *dev;
2138         struct rbd_snap *snap;
2139
2140         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2141         dev = &rbd_dev->dev;
2142
2143         dev->bus = &rbd_bus_type;
2144         dev->type = &rbd_device_type;
2145         dev->parent = &rbd_root_dev;
2146         dev->release = rbd_dev_release;
2147         dev_set_name(dev, "%d", rbd_dev->id);
2148         ret = device_register(dev);
2149         if (ret < 0)
2150                 goto out;
2151
2152         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2153                 ret = rbd_register_snap_dev(rbd_dev, snap,
2154                                              &rbd_dev->dev);
2155                 if (ret < 0)
2156                         break;
2157         }
2158 out:
2159         mutex_unlock(&ctl_mutex);
2160         return ret;
2161 }
2162
2163 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2164 {
2165         device_unregister(&rbd_dev->dev);
2166 }
2167
2168 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2169 {
2170         int ret, rc;
2171
2172         do {
2173                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2174                                          rbd_dev->header.obj_version);
2175                 if (ret == -ERANGE) {
2176                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2177                         rc = __rbd_update_snaps(rbd_dev);
2178                         mutex_unlock(&ctl_mutex);
2179                         if (rc < 0)
2180                                 return rc;
2181                 }
2182         } while (ret == -ERANGE);
2183
2184         return ret;
2185 }
2186
2187 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2188
2189 /*
2190  * Get a unique rbd identifier for the given new rbd_dev, and add
2191  * the rbd_dev to the global list.  The minimum rbd id is 1.
2192  */
2193 static void rbd_id_get(struct rbd_device *rbd_dev)
2194 {
2195         rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2196
2197         spin_lock(&rbd_dev_list_lock);
2198         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2199         spin_unlock(&rbd_dev_list_lock);
2200 }
2201
2202 /*
2203  * Remove an rbd_dev from the global list, and record that its
2204  * identifier is no longer in use.
2205  */
2206 static void rbd_id_put(struct rbd_device *rbd_dev)
2207 {
2208         struct list_head *tmp;
2209         int rbd_id = rbd_dev->id;
2210         int max_id;
2211
2212         BUG_ON(rbd_id < 1);
2213
2214         spin_lock(&rbd_dev_list_lock);
2215         list_del_init(&rbd_dev->node);
2216
2217         /*
2218          * If the id being "put" is not the current maximum, there
2219          * is nothing special we need to do.
2220          */
2221         if (rbd_id != atomic64_read(&rbd_id_max)) {
2222                 spin_unlock(&rbd_dev_list_lock);
2223                 return;
2224         }
2225
2226         /*
2227          * We need to update the current maximum id.  Search the
2228          * list to find out what it is.  We're more likely to find
2229          * the maximum at the end, so search the list backward.
2230          */
2231         max_id = 0;
2232         list_for_each_prev(tmp, &rbd_dev_list) {
2233                 struct rbd_device *rbd_dev;
2234
2235                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2236                 if (rbd_id > max_id)
2237                         max_id = rbd_id;
2238         }
2239         spin_unlock(&rbd_dev_list_lock);
2240
2241         /*
2242          * The max id could have been updated by rbd_id_get(), in
2243          * which case it now accurately reflects the new maximum.
2244          * Be careful not to overwrite the maximum value in that
2245          * case.
2246          */
2247         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2248 }
2249
2250 /*
2251  * Skips over white space at *buf, and updates *buf to point to the
2252  * first found non-space character (if any). Returns the length of
2253  * the token (string of non-white space characters) found.  Note
2254  * that *buf must be terminated with '\0'.
2255  */
2256 static inline size_t next_token(const char **buf)
2257 {
2258         /*
2259         * These are the characters that produce nonzero for
2260         * isspace() in the "C" and "POSIX" locales.
2261         */
2262         const char *spaces = " \f\n\r\t\v";
2263
2264         *buf += strspn(*buf, spaces);   /* Find start of token */
2265
2266         return strcspn(*buf, spaces);   /* Return token length */
2267 }
2268
2269 /*
2270  * Finds the next token in *buf, and if the provided token buffer is
2271  * big enough, copies the found token into it.  The result, if
2272  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2273  * must be terminated with '\0' on entry.
2274  *
2275  * Returns the length of the token found (not including the '\0').
2276  * Return value will be 0 if no token is found, and it will be >=
2277  * token_size if the token would not fit.
2278  *
2279  * The *buf pointer will be updated to point beyond the end of the
2280  * found token.  Note that this occurs even if the token buffer is
2281  * too small to hold it.
2282  */
2283 static inline size_t copy_token(const char **buf,
2284                                 char *token,
2285                                 size_t token_size)
2286 {
2287         size_t len;
2288
2289         len = next_token(buf);
2290         if (len < token_size) {
2291                 memcpy(token, *buf, len);
2292                 *(token + len) = '\0';
2293         }
2294         *buf += len;
2295
2296         return len;
2297 }
2298
2299 /*
2300  * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2301  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2302  * on the list of monitor addresses and other options provided via
2303  * /sys/bus/rbd/add.
2304  */
2305 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2306                               const char *buf,
2307                               const char **mon_addrs,
2308                               size_t *mon_addrs_size,
2309                               char *options,
2310                               size_t options_size)
2311 {
2312         size_t  len;
2313
2314         /* The first four tokens are required */
2315
2316         len = next_token(&buf);
2317         if (!len)
2318                 return -EINVAL;
2319         *mon_addrs_size = len + 1;
2320         *mon_addrs = buf;
2321
2322         buf += len;
2323
2324         len = copy_token(&buf, options, options_size);
2325         if (!len || len >= options_size)
2326                 return -EINVAL;
2327
2328         len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2329         if (!len || len >= sizeof (rbd_dev->pool_name))
2330                 return -EINVAL;
2331
2332         len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2333         if (!len || len >= sizeof (rbd_dev->obj))
2334                 return -EINVAL;
2335
2336         /* We have the object length in hand, save it. */
2337
2338         rbd_dev->obj_len = len;
2339
2340         BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2341                                 < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2342         sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2343
2344         /*
2345          * The snapshot name is optional, but it's an error if it's
2346          * too long.  If no snapshot is supplied, fill in the default.
2347          */
2348         len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2349         if (!len)
2350                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2351                         sizeof (RBD_SNAP_HEAD_NAME));
2352         else if (len >= sizeof (rbd_dev->snap_name))
2353                 return -EINVAL;
2354
2355         return 0;
2356 }
2357
2358 static ssize_t rbd_add(struct bus_type *bus,
2359                        const char *buf,
2360                        size_t count)
2361 {
2362         struct rbd_device *rbd_dev;
2363         const char *mon_addrs = NULL;
2364         size_t mon_addrs_size = 0;
2365         char *options = NULL;
2366         struct ceph_osd_client *osdc;
2367         int rc = -ENOMEM;
2368
2369         if (!try_module_get(THIS_MODULE))
2370                 return -ENODEV;
2371
2372         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2373         if (!rbd_dev)
2374                 goto err_nomem;
2375         options = kmalloc(count, GFP_KERNEL);
2376         if (!options)
2377                 goto err_nomem;
2378
2379         /* static rbd_device initialization */
2380         spin_lock_init(&rbd_dev->lock);
2381         INIT_LIST_HEAD(&rbd_dev->node);
2382         INIT_LIST_HEAD(&rbd_dev->snaps);
2383         init_rwsem(&rbd_dev->header_rwsem);
2384
2385         init_rwsem(&rbd_dev->header_rwsem);
2386
2387         /* generate unique id: find highest unique id, add one */
2388         rbd_id_get(rbd_dev);
2389
2390         /* Fill in the device name, now that we have its id. */
2391         BUILD_BUG_ON(DEV_NAME_LEN
2392                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2393         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2394
2395         /* parse add command */
2396         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2397                                 options, count);
2398         if (rc)
2399                 goto err_put_id;
2400
2401         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2402                                                 options);
2403         if (IS_ERR(rbd_dev->rbd_client)) {
2404                 rc = PTR_ERR(rbd_dev->rbd_client);
2405                 goto err_put_id;
2406         }
2407
2408         /* pick the pool */
2409         osdc = &rbd_dev->rbd_client->client->osdc;
2410         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2411         if (rc < 0)
2412                 goto err_out_client;
2413         rbd_dev->poolid = rc;
2414
2415         /* register our block device */
2416         rc = register_blkdev(0, rbd_dev->name);
2417         if (rc < 0)
2418                 goto err_out_client;
2419         rbd_dev->major = rc;
2420
2421         rc = rbd_bus_add_dev(rbd_dev);
2422         if (rc)
2423                 goto err_out_blkdev;
2424
2425         /*
2426          * At this point cleanup in the event of an error is the job
2427          * of the sysfs code (initiated by rbd_bus_del_dev()).
2428          *
2429          * Set up and announce blkdev mapping.
2430          */
2431         rc = rbd_init_disk(rbd_dev);
2432         if (rc)
2433                 goto err_out_bus;
2434
2435         rc = rbd_init_watch_dev(rbd_dev);
2436         if (rc)
2437                 goto err_out_bus;
2438
2439         return count;
2440
2441 err_out_bus:
2442         /* this will also clean up rest of rbd_dev stuff */
2443
2444         rbd_bus_del_dev(rbd_dev);
2445         kfree(options);
2446         return rc;
2447
2448 err_out_blkdev:
2449         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2450 err_out_client:
2451         rbd_put_client(rbd_dev);
2452 err_put_id:
2453         rbd_id_put(rbd_dev);
2454 err_nomem:
2455         kfree(options);
2456         kfree(rbd_dev);
2457
2458         dout("Error adding device %s\n", buf);
2459         module_put(THIS_MODULE);
2460
2461         return (ssize_t) rc;
2462 }
2463
2464 static struct rbd_device *__rbd_get_dev(unsigned long id)
2465 {
2466         struct list_head *tmp;
2467         struct rbd_device *rbd_dev;
2468
2469         spin_lock(&rbd_dev_list_lock);
2470         list_for_each(tmp, &rbd_dev_list) {
2471                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2472                 if (rbd_dev->id == id) {
2473                         spin_unlock(&rbd_dev_list_lock);
2474                         return rbd_dev;
2475                 }
2476         }
2477         spin_unlock(&rbd_dev_list_lock);
2478         return NULL;
2479 }
2480
2481 static void rbd_dev_release(struct device *dev)
2482 {
2483         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2484
2485         if (rbd_dev->watch_request) {
2486                 struct ceph_client *client = rbd_dev->rbd_client->client;
2487
2488                 ceph_osdc_unregister_linger_request(&client->osdc,
2489                                                     rbd_dev->watch_request);
2490         }
2491         if (rbd_dev->watch_event)
2492                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2493
2494         rbd_put_client(rbd_dev);
2495
2496         /* clean up and free blkdev */
2497         rbd_free_disk(rbd_dev);
2498         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2499
2500         /* done with the id, and with the rbd_dev */
2501         rbd_id_put(rbd_dev);
2502         kfree(rbd_dev);
2503
2504         /* release module ref */
2505         module_put(THIS_MODULE);
2506 }
2507
2508 static ssize_t rbd_remove(struct bus_type *bus,
2509                           const char *buf,
2510                           size_t count)
2511 {
2512         struct rbd_device *rbd_dev = NULL;
2513         int target_id, rc;
2514         unsigned long ul;
2515         int ret = count;
2516
2517         rc = strict_strtoul(buf, 10, &ul);
2518         if (rc)
2519                 return rc;
2520
2521         /* convert to int; abort if we lost anything in the conversion */
2522         target_id = (int) ul;
2523         if (target_id != ul)
2524                 return -EINVAL;
2525
2526         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2527
2528         rbd_dev = __rbd_get_dev(target_id);
2529         if (!rbd_dev) {
2530                 ret = -ENOENT;
2531                 goto done;
2532         }
2533
2534         __rbd_remove_all_snaps(rbd_dev);
2535         rbd_bus_del_dev(rbd_dev);
2536
2537 done:
2538         mutex_unlock(&ctl_mutex);
2539         return ret;
2540 }
2541
2542 static ssize_t rbd_snap_add(struct device *dev,
2543                             struct device_attribute *attr,
2544                             const char *buf,
2545                             size_t count)
2546 {
2547         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2548         int ret;
2549         char *name = kmalloc(count + 1, GFP_KERNEL);
2550         if (!name)
2551                 return -ENOMEM;
2552
2553         snprintf(name, count, "%s", buf);
2554
2555         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2556
2557         ret = rbd_header_add_snap(rbd_dev,
2558                                   name, GFP_KERNEL);
2559         if (ret < 0)
2560                 goto err_unlock;
2561
2562         ret = __rbd_update_snaps(rbd_dev);
2563         if (ret < 0)
2564                 goto err_unlock;
2565
2566         /* shouldn't hold ctl_mutex when notifying.. notify might
2567            trigger a watch callback that would need to get that mutex */
2568         mutex_unlock(&ctl_mutex);
2569
2570         /* make a best effort, don't error if failed */
2571         rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2572
2573         ret = count;
2574         kfree(name);
2575         return ret;
2576
2577 err_unlock:
2578         mutex_unlock(&ctl_mutex);
2579         kfree(name);
2580         return ret;
2581 }
2582
2583 /*
2584  * create control files in sysfs
2585  * /sys/bus/rbd/...
2586  */
2587 static int rbd_sysfs_init(void)
2588 {
2589         int ret;
2590
2591         ret = device_register(&rbd_root_dev);
2592         if (ret < 0)
2593                 return ret;
2594
2595         ret = bus_register(&rbd_bus_type);
2596         if (ret < 0)
2597                 device_unregister(&rbd_root_dev);
2598
2599         return ret;
2600 }
2601
2602 static void rbd_sysfs_cleanup(void)
2603 {
2604         bus_unregister(&rbd_bus_type);
2605         device_unregister(&rbd_root_dev);
2606 }
2607
2608 int __init rbd_init(void)
2609 {
2610         int rc;
2611
2612         rc = rbd_sysfs_init();
2613         if (rc)
2614                 return rc;
2615         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2616         return 0;
2617 }
2618
2619 void __exit rbd_exit(void)
2620 {
2621         rbd_sysfs_cleanup();
2622 }
2623
2624 module_init(rbd_init);
2625 module_exit(rbd_exit);
2626
2627 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2628 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2629 MODULE_DESCRIPTION("rados block device");
2630
2631 /* following authorship retained from original osdblk.c */
2632 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2633
2634 MODULE_LICENSE("GPL");