Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
authorLinus Torvalds <torvalds@linux-foundation.org>
Fri, 13 Nov 2015 17:24:40 +0000 (09:24 -0800)
committerLinus Torvalds <torvalds@linux-foundation.org>
Fri, 13 Nov 2015 17:24:40 +0000 (09:24 -0800)
Pull Ceph updates from Sage Weil:
 "There are several patches from Ilya fixing RBD allocation lifecycle
  issues, a series adding a nocephx_sign_messages option (and associated
  bug fixes/cleanups), several patches from Zheng improving the
  (directory) fsync behavior, a big improvement in IO for direct-io
  requests when striping is enabled from Caifeng, and several other
  small fixes and cleanups"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  libceph: clear msg->con in ceph_msg_release() only
  libceph: add nocephx_sign_messages option
  libceph: stop duplicating client fields in messenger
  libceph: drop authorizer check from cephx msg signing routines
  libceph: msg signing callouts don't need con argument
  libceph: evaluate osd_req_op_data() arguments only once
  ceph: make fsync() wait unsafe requests that created/modified inode
  ceph: add request to i_unsafe_dirops when getting unsafe reply
  libceph: introduce ceph_x_authorizer_cleanup()
  ceph: don't invalidate page cache when inode is no longer used
  rbd: remove duplicate calls to rbd_dev_mapping_clear()
  rbd: set device_type::release instead of device::release
  rbd: don't free rbd_dev outside of the release callback
  rbd: return -ENOMEM instead of pool id if rbd_dev_create() fails
  libceph: use local variable cursor instead of &msg->cursor
  libceph: remove con argument in handle_reply()
  ceph: combine as many iovec as possile into one OSD request
  ceph: fix message length computation
  ceph: fix a comment typo
  rbd: drop null test before destroy functions

15 files changed:
drivers/block/rbd.c
fs/ceph/cache.c
fs/ceph/caps.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.h
include/linux/ceph/libceph.h
include/linux/ceph/messenger.h
net/ceph/auth_x.c
net/ceph/ceph_common.c
net/ceph/crypto.h
net/ceph/messenger.c
net/ceph/osd_client.c

index 128e7df5b807222ba2d6f50148cb164919a308f7..235708c7c46eee709acb110acb95a28d85ebc946 100644 (file)
@@ -418,8 +418,6 @@ MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (d
 
 static int rbd_img_request_submit(struct rbd_img_request *img_request);
 
-static void rbd_dev_device_release(struct device *dev);
-
 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
                       size_t count);
 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
@@ -3991,14 +3989,12 @@ static const struct attribute_group *rbd_attr_groups[] = {
        NULL
 };
 
-static void rbd_sysfs_dev_release(struct device *dev)
-{
-}
+static void rbd_dev_release(struct device *dev);
 
 static struct device_type rbd_device_type = {
        .name           = "rbd",
        .groups         = rbd_attr_groups,
-       .release        = rbd_sysfs_dev_release,
+       .release        = rbd_dev_release,
 };
 
 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
@@ -4041,6 +4037,25 @@ static void rbd_spec_free(struct kref *kref)
        kfree(spec);
 }
 
+static void rbd_dev_release(struct device *dev)
+{
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+       bool need_put = !!rbd_dev->opts;
+
+       rbd_put_client(rbd_dev->rbd_client);
+       rbd_spec_put(rbd_dev->spec);
+       kfree(rbd_dev->opts);
+       kfree(rbd_dev);
+
+       /*
+        * This is racy, but way better than putting module outside of
+        * the release callback.  The race window is pretty small, so
+        * doing something similar to dm (dm-builtin.c) is overkill.
+        */
+       if (need_put)
+               module_put(THIS_MODULE);
+}
+
 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
                                         struct rbd_spec *spec,
                                         struct rbd_options *opts)
@@ -4057,6 +4072,11 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
        INIT_LIST_HEAD(&rbd_dev->node);
        init_rwsem(&rbd_dev->header_rwsem);
 
+       rbd_dev->dev.bus = &rbd_bus_type;
+       rbd_dev->dev.type = &rbd_device_type;
+       rbd_dev->dev.parent = &rbd_root_dev;
+       device_initialize(&rbd_dev->dev);
+
        rbd_dev->rbd_client = rbdc;
        rbd_dev->spec = spec;
        rbd_dev->opts = opts;
@@ -4068,15 +4088,21 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
        rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
        rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
 
+       /*
+        * If this is a mapping rbd_dev (as opposed to a parent one),
+        * pin our module.  We have a ref from do_rbd_add(), so use
+        * __module_get().
+        */
+       if (rbd_dev->opts)
+               __module_get(THIS_MODULE);
+
        return rbd_dev;
 }
 
 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
 {
-       rbd_put_client(rbd_dev->rbd_client);
-       rbd_spec_put(rbd_dev->spec);
-       kfree(rbd_dev->opts);
-       kfree(rbd_dev);
+       if (rbd_dev)
+               put_device(&rbd_dev->dev);
 }
 
 /*
@@ -4702,27 +4728,6 @@ static int rbd_dev_header_info(struct rbd_device *rbd_dev)
        return rbd_dev_v2_header_info(rbd_dev);
 }
 
-static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
-{
-       struct device *dev;
-       int ret;
-
-       dev = &rbd_dev->dev;
-       dev->bus = &rbd_bus_type;
-       dev->type = &rbd_device_type;
-       dev->parent = &rbd_root_dev;
-       dev->release = rbd_dev_device_release;
-       dev_set_name(dev, "%d", rbd_dev->dev_id);
-       ret = device_register(dev);
-
-       return ret;
-}
-
-static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
-{
-       device_unregister(&rbd_dev->dev);
-}
-
 /*
  * Get a unique rbd identifier for the given new rbd_dev, and add
  * the rbd_dev to the global list.
@@ -5225,7 +5230,8 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
        set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
 
-       ret = rbd_bus_add_dev(rbd_dev);
+       dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
+       ret = device_add(&rbd_dev->dev);
        if (ret)
                goto err_out_mapping;
 
@@ -5248,8 +5254,6 @@ err_out_blkdev:
                unregister_blkdev(rbd_dev->major, rbd_dev->name);
 err_out_id:
        rbd_dev_id_put(rbd_dev);
-       rbd_dev_mapping_clear(rbd_dev);
-
        return ret;
 }
 
@@ -5397,7 +5401,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        struct rbd_spec *spec = NULL;
        struct rbd_client *rbdc;
        bool read_only;
-       int rc = -ENOMEM;
+       int rc;
 
        if (!try_module_get(THIS_MODULE))
                return -ENODEV;
@@ -5405,7 +5409,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        /* parse add command */
        rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
        if (rc < 0)
-               goto err_out_module;
+               goto out;
 
        rbdc = rbd_get_client(ceph_opts);
        if (IS_ERR(rbdc)) {
@@ -5432,8 +5436,10 @@ static ssize_t do_rbd_add(struct bus_type *bus,
        }
 
        rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
-       if (!rbd_dev)
+       if (!rbd_dev) {
+               rc = -ENOMEM;
                goto err_out_client;
+       }
        rbdc = NULL;            /* rbd_dev now owns this */
        spec = NULL;            /* rbd_dev now owns this */
        rbd_opts = NULL;        /* rbd_dev now owns this */
@@ -5458,10 +5464,13 @@ static ssize_t do_rbd_add(struct bus_type *bus,
                 */
                rbd_dev_header_unwatch_sync(rbd_dev);
                rbd_dev_image_release(rbd_dev);
-               goto err_out_module;
+               goto out;
        }
 
-       return count;
+       rc = count;
+out:
+       module_put(THIS_MODULE);
+       return rc;
 
 err_out_rbd_dev:
        rbd_dev_destroy(rbd_dev);
@@ -5470,12 +5479,7 @@ err_out_client:
 err_out_args:
        rbd_spec_put(spec);
        kfree(rbd_opts);
-err_out_module:
-       module_put(THIS_MODULE);
-
-       dout("Error adding device %s\n", buf);
-
-       return (ssize_t)rc;
+       goto out;
 }
 
 static ssize_t rbd_add(struct bus_type *bus,
@@ -5495,17 +5499,15 @@ static ssize_t rbd_add_single_major(struct bus_type *bus,
        return do_rbd_add(bus, buf, count);
 }
 
-static void rbd_dev_device_release(struct device *dev)
+static void rbd_dev_device_release(struct rbd_device *rbd_dev)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-
        rbd_free_disk(rbd_dev);
        clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
+       device_del(&rbd_dev->dev);
        rbd_dev_mapping_clear(rbd_dev);
        if (!single_major)
                unregister_blkdev(rbd_dev->major, rbd_dev->name);
        rbd_dev_id_put(rbd_dev);
-       rbd_dev_mapping_clear(rbd_dev);
 }
 
 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
@@ -5590,9 +5592,8 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
         * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
         * in a potential use after free of rbd_dev->disk or rbd_dev.
         */
-       rbd_bus_del_dev(rbd_dev);
+       rbd_dev_device_release(rbd_dev);
        rbd_dev_image_release(rbd_dev);
-       module_put(THIS_MODULE);
 
        return count;
 }
@@ -5663,10 +5664,8 @@ static int rbd_slab_init(void)
        if (rbd_segment_name_cache)
                return 0;
 out_err:
-       if (rbd_obj_request_cache) {
-               kmem_cache_destroy(rbd_obj_request_cache);
-               rbd_obj_request_cache = NULL;
-       }
+       kmem_cache_destroy(rbd_obj_request_cache);
+       rbd_obj_request_cache = NULL;
 
        kmem_cache_destroy(rbd_img_request_cache);
        rbd_img_request_cache = NULL;
index 834f9f3723fbe3980e8a9705e8c1d0f86936eb40..a4766ded1ba78e8bbbff70d4c5e65b3f9060d687 100644 (file)
@@ -88,7 +88,7 @@ static uint16_t ceph_fscache_inode_get_key(const void *cookie_netfs_data,
        const struct ceph_inode_info* ci = cookie_netfs_data;
        uint16_t klen;
 
-       /* use ceph virtual inode (id + snaphot) */
+       /* use ceph virtual inode (id + snapshot) */
        klen = sizeof(ci->i_vino);
        if (klen > maxbuf)
                return 0;
index 27b566874bc1d2b0494b49ebe1769f380ccf8648..c69e1253b47bfbefb2eb2cb1564a5c4b9d87f9a3 100644 (file)
@@ -1655,9 +1655,8 @@ retry_locked:
            !S_ISDIR(inode->i_mode) &&          /* ignore readdir cache */
            ci->i_wrbuffer_ref == 0 &&          /* no dirty pages... */
            inode->i_data.nrpages &&            /* have cached pages */
-           (file_wanted == 0 ||                /* no open files */
-            (revoking & (CEPH_CAP_FILE_CACHE|
-                         CEPH_CAP_FILE_LAZYIO))) && /*  or revoking cache */
+           (revoking & (CEPH_CAP_FILE_CACHE|
+                        CEPH_CAP_FILE_LAZYIO)) && /*  or revoking cache */
            !tried_invalidate) {
                dout("check_caps trying to invalidate on %p\n", inode);
                if (try_nonblocking_invalidate(inode) < 0) {
@@ -1971,49 +1970,46 @@ out:
 }
 
 /*
- * wait for any uncommitted directory operations to commit.
+ * wait for any unsafe requests to complete.
  */
-static int unsafe_dirop_wait(struct inode *inode)
+static int unsafe_request_wait(struct inode *inode)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct list_head *head = &ci->i_unsafe_dirops;
-       struct ceph_mds_request *req;
-       u64 last_tid;
-       int ret = 0;
-
-       if (!S_ISDIR(inode->i_mode))
-               return 0;
+       struct ceph_mds_request *req1 = NULL, *req2 = NULL;
+       int ret, err = 0;
 
        spin_lock(&ci->i_unsafe_lock);
-       if (list_empty(head))
-               goto out;
-
-       req = list_last_entry(head, struct ceph_mds_request,
-                             r_unsafe_dir_item);
-       last_tid = req->r_tid;
-
-       do {
-               ceph_mdsc_get_request(req);
-               spin_unlock(&ci->i_unsafe_lock);
+       if (S_ISDIR(inode->i_mode) && !list_empty(&ci->i_unsafe_dirops)) {
+               req1 = list_last_entry(&ci->i_unsafe_dirops,
+                                       struct ceph_mds_request,
+                                       r_unsafe_dir_item);
+               ceph_mdsc_get_request(req1);
+       }
+       if (!list_empty(&ci->i_unsafe_iops)) {
+               req2 = list_last_entry(&ci->i_unsafe_iops,
+                                       struct ceph_mds_request,
+                                       r_unsafe_target_item);
+               ceph_mdsc_get_request(req2);
+       }
+       spin_unlock(&ci->i_unsafe_lock);
 
-               dout("unsafe_dirop_wait %p wait on tid %llu (until %llu)\n",
-                    inode, req->r_tid, last_tid);
-               ret = !wait_for_completion_timeout(&req->r_safe_completion,
-                                       ceph_timeout_jiffies(req->r_timeout));
+       dout("unsafe_requeset_wait %p wait on tid %llu %llu\n",
+            inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL);
+       if (req1) {
+               ret = !wait_for_completion_timeout(&req1->r_safe_completion,
+                                       ceph_timeout_jiffies(req1->r_timeout));
                if (ret)
-                       ret = -EIO;  /* timed out */
-
-               ceph_mdsc_put_request(req);
-
-               spin_lock(&ci->i_unsafe_lock);
-               if (ret || list_empty(head))
-                       break;
-               req = list_first_entry(head, struct ceph_mds_request,
-                                      r_unsafe_dir_item);
-       } while (req->r_tid < last_tid);
-out:
-       spin_unlock(&ci->i_unsafe_lock);
-       return ret;
+                       err = -EIO;
+               ceph_mdsc_put_request(req1);
+       }
+       if (req2) {
+               ret = !wait_for_completion_timeout(&req2->r_safe_completion,
+                                       ceph_timeout_jiffies(req2->r_timeout));
+               if (ret)
+                       err = -EIO;
+               ceph_mdsc_put_request(req2);
+       }
+       return err;
 }
 
 int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
@@ -2039,7 +2035,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
        dirty = try_flush_caps(inode, &flush_tid);
        dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
 
-       ret = unsafe_dirop_wait(inode);
+       ret = unsafe_request_wait(inode);
 
        /*
         * only wait on non-file metadata writeback (the mds
index 0c62868b5c561b37fa866b68bbe34f3ffb1fd6a6..3c68e6aee2f0531bdd8afb0125bbae48419d0dfb 100644 (file)
  * need to wait for MDS acknowledgement.
  */
 
+/*
+ * Calculate the length sum of direct io vectors that can
+ * be combined into one page vector.
+ */
+static size_t dio_get_pagev_size(const struct iov_iter *it)
+{
+    const struct iovec *iov = it->iov;
+    const struct iovec *iovend = iov + it->nr_segs;
+    size_t size;
+
+    size = iov->iov_len - it->iov_offset;
+    /*
+     * An iov can be page vectored when both the current tail
+     * and the next base are page aligned.
+     */
+    while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) &&
+           (++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) {
+        size += iov->iov_len;
+    }
+    dout("dio_get_pagevlen len = %zu\n", size);
+    return size;
+}
+
+/*
+ * Allocate a page vector based on (@it, @nbytes).
+ * The return value is the tuple describing a page vector,
+ * that is (@pages, @page_align, @num_pages).
+ */
+static struct page **
+dio_get_pages_alloc(const struct iov_iter *it, size_t nbytes,
+                   size_t *page_align, int *num_pages)
+{
+       struct iov_iter tmp_it = *it;
+       size_t align;
+       struct page **pages;
+       int ret = 0, idx, npages;
+
+       align = (unsigned long)(it->iov->iov_base + it->iov_offset) &
+               (PAGE_SIZE - 1);
+       npages = calc_pages_for(align, nbytes);
+       pages = kmalloc(sizeof(*pages) * npages, GFP_KERNEL);
+       if (!pages) {
+               pages = vmalloc(sizeof(*pages) * npages);
+               if (!pages)
+                       return ERR_PTR(-ENOMEM);
+       }
+
+       for (idx = 0; idx < npages; ) {
+               size_t start;
+               ret = iov_iter_get_pages(&tmp_it, pages + idx, nbytes,
+                                        npages - idx, &start);
+               if (ret < 0)
+                       goto fail;
+
+               iov_iter_advance(&tmp_it, ret);
+               nbytes -= ret;
+               idx += (ret + start + PAGE_SIZE - 1) / PAGE_SIZE;
+       }
+
+       BUG_ON(nbytes != 0);
+       *num_pages = npages;
+       *page_align = align;
+       dout("dio_get_pages_alloc: got %d pages align %zu\n", npages, align);
+       return pages;
+fail:
+       ceph_put_page_vector(pages, idx, false);
+       return ERR_PTR(ret);
+}
 
 /*
  * Prepare an open request.  Preallocate ceph_cap to avoid an
@@ -458,11 +526,10 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
                        size_t start;
                        ssize_t n;
 
-                       n = iov_iter_get_pages_alloc(i, &pages, INT_MAX, &start);
-                       if (n < 0)
-                               return n;
-
-                       num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE;
+                       n = dio_get_pagev_size(i);
+                       pages = dio_get_pages_alloc(i, n, &start, &num_pages);
+                       if (IS_ERR(pages))
+                               return PTR_ERR(pages);
 
                        ret = striped_read(inode, off, n,
                                           pages, num_pages, checkeof,
@@ -592,7 +659,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
                CEPH_OSD_FLAG_WRITE;
 
        while (iov_iter_count(from) > 0) {
-               u64 len = iov_iter_single_seg_count(from);
+               u64 len = dio_get_pagev_size(from);
                size_t start;
                ssize_t n;
 
@@ -611,14 +678,14 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
 
                osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
 
-               n = iov_iter_get_pages_alloc(from, &pages, len, &start);
-               if (unlikely(n < 0)) {
-                       ret = n;
+               n = len;
+               pages = dio_get_pages_alloc(from, len, &start, &num_pages);
+               if (IS_ERR(pages)) {
                        ceph_osdc_put_request(req);
+                       ret = PTR_ERR(pages);
                        break;
                }
 
-               num_pages = (n + start + PAGE_SIZE - 1) / PAGE_SIZE;
                /*
                 * throw out any page cache pages in this range. this
                 * may block.
index 96d2bd8299022e554c8bfdc0b7f1c759be8fc8cd..498dcfa2dcdbedf393ae26fc9f7f68cf90bceb90 100644 (file)
@@ -452,6 +452,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
 
        INIT_LIST_HEAD(&ci->i_unsafe_writes);
        INIT_LIST_HEAD(&ci->i_unsafe_dirops);
+       INIT_LIST_HEAD(&ci->i_unsafe_iops);
        spin_lock_init(&ci->i_unsafe_lock);
 
        ci->i_snap_realm = NULL;
index 51cb02da75d98979b18e05d7bdad4dee89e258d4..e7b130a637f9f31a34efd67d6cf5789a8fcdef00 100644 (file)
@@ -633,13 +633,8 @@ static void __register_request(struct ceph_mds_client *mdsc,
                mdsc->oldest_tid = req->r_tid;
 
        if (dir) {
-               struct ceph_inode_info *ci = ceph_inode(dir);
-
                ihold(dir);
-               spin_lock(&ci->i_unsafe_lock);
                req->r_unsafe_dir = dir;
-               list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
-               spin_unlock(&ci->i_unsafe_lock);
        }
 }
 
@@ -665,13 +660,20 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
        rb_erase(&req->r_node, &mdsc->request_tree);
        RB_CLEAR_NODE(&req->r_node);
 
-       if (req->r_unsafe_dir) {
+       if (req->r_unsafe_dir && req->r_got_unsafe) {
                struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
-
                spin_lock(&ci->i_unsafe_lock);
                list_del_init(&req->r_unsafe_dir_item);
                spin_unlock(&ci->i_unsafe_lock);
+       }
+       if (req->r_target_inode && req->r_got_unsafe) {
+               struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
+               spin_lock(&ci->i_unsafe_lock);
+               list_del_init(&req->r_unsafe_target_item);
+               spin_unlock(&ci->i_unsafe_lock);
+       }
 
+       if (req->r_unsafe_dir) {
                iput(req->r_unsafe_dir);
                req->r_unsafe_dir = NULL;
        }
@@ -1430,6 +1432,13 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
                if ((used | wanted) & CEPH_CAP_ANY_WR)
                        goto out;
        }
+       /* The inode has cached pages, but it's no longer used.
+        * we can safely drop it */
+       if (wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
+           !(oissued & CEPH_CAP_FILE_CACHE)) {
+         used = 0;
+         oissued = 0;
+       }
        if ((used | wanted) & ~oissued & mine)
                goto out;   /* we need these caps */
 
@@ -1438,7 +1447,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
                /* we aren't the only cap.. just remove us */
                __ceph_remove_cap(cap, true);
        } else {
-               /* try to drop referring dentries */
+               /* try dropping referring dentries */
                spin_unlock(&ci->i_ceph_lock);
                d_prune_aliases(inode);
                dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
@@ -1704,6 +1713,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
        req->r_started = jiffies;
        req->r_resend_mds = -1;
        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
+       INIT_LIST_HEAD(&req->r_unsafe_target_item);
        req->r_fmode = -1;
        kref_init(&req->r_kref);
        INIT_LIST_HEAD(&req->r_wait);
@@ -1935,7 +1945,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
 
        len = sizeof(*head) +
                pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
-               sizeof(struct timespec);
+               sizeof(struct ceph_timespec);
 
        /* calculate (max) length for cap releases */
        len += sizeof(struct ceph_mds_request_release) *
@@ -2477,6 +2487,14 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        } else {
                req->r_got_unsafe = true;
                list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
+               if (req->r_unsafe_dir) {
+                       struct ceph_inode_info *ci =
+                                       ceph_inode(req->r_unsafe_dir);
+                       spin_lock(&ci->i_unsafe_lock);
+                       list_add_tail(&req->r_unsafe_dir_item,
+                                     &ci->i_unsafe_dirops);
+                       spin_unlock(&ci->i_unsafe_lock);
+               }
        }
 
        dout("handle_reply tid %lld result %d\n", tid, result);
@@ -2518,6 +2536,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        up_read(&mdsc->snap_rwsem);
        if (realm)
                ceph_put_snap_realm(mdsc, realm);
+
+       if (err == 0 && req->r_got_unsafe && req->r_target_inode) {
+               struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
+               spin_lock(&ci->i_unsafe_lock);
+               list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
+               spin_unlock(&ci->i_unsafe_lock);
+       }
 out_err:
        mutex_lock(&mdsc->mutex);
        if (!req->r_aborted) {
@@ -3917,17 +3942,19 @@ static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
        return msg;
 }
 
-static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
+static int mds_sign_message(struct ceph_msg *msg)
 {
-       struct ceph_mds_session *s = con->private;
+       struct ceph_mds_session *s = msg->con->private;
        struct ceph_auth_handshake *auth = &s->s_auth;
+
        return ceph_auth_sign_message(auth, msg);
 }
 
-static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
+static int mds_check_message_signature(struct ceph_msg *msg)
 {
-       struct ceph_mds_session *s = con->private;
+       struct ceph_mds_session *s = msg->con->private;
        struct ceph_auth_handshake *auth = &s->s_auth;
+
        return ceph_auth_check_message_signature(auth, msg);
 }
 
@@ -3940,8 +3967,8 @@ static const struct ceph_connection_operations mds_con_ops = {
        .invalidate_authorizer = invalidate_authorizer,
        .peer_reset = peer_reset,
        .alloc_msg = mds_alloc_msg,
-       .sign_message = sign_message,
-       .check_message_signature = check_message_signature,
+       .sign_message = mds_sign_message,
+       .check_message_signature = mds_check_message_signature,
 };
 
 /* eof */
index f575eafe2261cbd5974d8d4f072879e9d5bd7a39..ccf11ef0ca8717f79e6bfc9d6cf806ec359b5e8c 100644 (file)
@@ -236,6 +236,9 @@ struct ceph_mds_request {
        struct inode    *r_unsafe_dir;
        struct list_head r_unsafe_dir_item;
 
+       /* unsafe requests that modify the target inode */
+       struct list_head r_unsafe_target_item;
+
        struct ceph_mds_session *r_session;
 
        int               r_attempts;   /* resend attempts */
index 2f2460d23a0600f8f9bf2e1cc4fe3b2286684356..75b7d125ce668a9b3f4f851d2be1c688d6d5f964 100644 (file)
@@ -342,6 +342,7 @@ struct ceph_inode_info {
 
        struct list_head i_unsafe_writes; /* uncommitted sync writes */
        struct list_head i_unsafe_dirops; /* uncommitted mds dir ops */
+       struct list_head i_unsafe_iops;   /* uncommitted mds inode ops */
        spinlock_t i_unsafe_lock;
 
        struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */
index 397c5cd09794854ebc8891fac59b4dfa50e7e141..3e3799cdc6e66d719fed8ae9adc8539d4e2cb0c6 100644 (file)
@@ -29,8 +29,9 @@
 #define CEPH_OPT_NOSHARE          (1<<1) /* don't share client with other sbs */
 #define CEPH_OPT_MYIP             (1<<2) /* specified my ip */
 #define CEPH_OPT_NOCRC            (1<<3) /* no data crc on writes */
-#define CEPH_OPT_NOMSGAUTH       (1<<4) /* not require cephx message signature */
+#define CEPH_OPT_NOMSGAUTH       (1<<4) /* don't require msg signing feat */
 #define CEPH_OPT_TCP_NODELAY     (1<<5) /* TCP_NODELAY on TCP sockets */
+#define CEPH_OPT_NOMSGSIGN       (1<<6) /* don't sign msgs */
 
 #define CEPH_OPT_DEFAULT   (CEPH_OPT_TCP_NODELAY)
 
@@ -137,6 +138,7 @@ struct ceph_client {
 #endif
 };
 
+#define from_msgr(ms)  container_of(ms, struct ceph_client, msgr)
 
 
 /*
index b2371d9b51fac6ea69032576ac1c3cddfc104016..71b1d6cdcb5d1fc53dd45fa3950e138cfac19dfe 100644 (file)
@@ -43,10 +43,9 @@ struct ceph_connection_operations {
        struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
                                        struct ceph_msg_header *hdr,
                                        int *skip);
-       int (*sign_message) (struct ceph_connection *con, struct ceph_msg *msg);
 
-       int (*check_message_signature) (struct ceph_connection *con,
-                                       struct ceph_msg *msg);
+       int (*sign_message) (struct ceph_msg *msg);
+       int (*check_message_signature) (struct ceph_msg *msg);
 };
 
 /* use format string %s%d */
@@ -58,8 +57,6 @@ struct ceph_messenger {
 
        atomic_t stopping;
        possible_net_t net;
-       bool nocrc;
-       bool tcp_nodelay;
 
        /*
         * the global_seq counts connections i (attempt to) initiate
@@ -67,9 +64,6 @@ struct ceph_messenger {
         */
        u32 global_seq;
        spinlock_t global_seq_lock;
-
-       u64 supported_features;
-       u64 required_features;
 };
 
 enum ceph_msg_data_type {
@@ -268,11 +262,7 @@ extern void ceph_msgr_exit(void);
 extern void ceph_msgr_flush(void);
 
 extern void ceph_messenger_init(struct ceph_messenger *msgr,
-                       struct ceph_entity_addr *myaddr,
-                       u64 supported_features,
-                       u64 required_features,
-                       bool nocrc,
-                       bool tcp_nodelay);
+                               struct ceph_entity_addr *myaddr);
 extern void ceph_messenger_fini(struct ceph_messenger *msgr);
 
 extern void ceph_con_init(struct ceph_connection *con, void *private,
index ba6eb17226da424d59bb462a2089186bbd3233af..10d87753ed8737329c244b1ae7718e95c8abd118 100644 (file)
@@ -8,6 +8,7 @@
 
 #include <linux/ceph/decode.h>
 #include <linux/ceph/auth.h>
+#include <linux/ceph/libceph.h>
 #include <linux/ceph/messenger.h>
 
 #include "crypto.h"
@@ -279,6 +280,15 @@ bad:
        return -EINVAL;
 }
 
+static void ceph_x_authorizer_cleanup(struct ceph_x_authorizer *au)
+{
+       ceph_crypto_key_destroy(&au->session_key);
+       if (au->buf) {
+               ceph_buffer_put(au->buf);
+               au->buf = NULL;
+       }
+}
+
 static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
                                   struct ceph_x_ticket_handler *th,
                                   struct ceph_x_authorizer *au)
@@ -297,7 +307,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
        ceph_crypto_key_destroy(&au->session_key);
        ret = ceph_crypto_key_clone(&au->session_key, &th->session_key);
        if (ret)
-               return ret;
+               goto out_au;
 
        maxlen = sizeof(*msg_a) + sizeof(msg_b) +
                ceph_x_encrypt_buflen(ticket_blob_len);
@@ -309,8 +319,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
        if (!au->buf) {
                au->buf = ceph_buffer_new(maxlen, GFP_NOFS);
                if (!au->buf) {
-                       ceph_crypto_key_destroy(&au->session_key);
-                       return -ENOMEM;
+                       ret = -ENOMEM;
+                       goto out_au;
                }
        }
        au->service = th->service;
@@ -340,7 +350,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
        ret = ceph_x_encrypt(&au->session_key, &msg_b, sizeof(msg_b),
                             p, end - p);
        if (ret < 0)
-               goto out_buf;
+               goto out_au;
        p += ret;
        au->buf->vec.iov_len = p - au->buf->vec.iov_base;
        dout(" built authorizer nonce %llx len %d\n", au->nonce,
@@ -348,9 +358,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
        BUG_ON(au->buf->vec.iov_len > maxlen);
        return 0;
 
-out_buf:
-       ceph_buffer_put(au->buf);
-       au->buf = NULL;
+out_au:
+       ceph_x_authorizer_cleanup(au);
        return ret;
 }
 
@@ -624,8 +633,7 @@ static void ceph_x_destroy_authorizer(struct ceph_auth_client *ac,
 {
        struct ceph_x_authorizer *au = (void *)a;
 
-       ceph_crypto_key_destroy(&au->session_key);
-       ceph_buffer_put(au->buf);
+       ceph_x_authorizer_cleanup(au);
        kfree(au);
 }
 
@@ -653,8 +661,7 @@ static void ceph_x_destroy(struct ceph_auth_client *ac)
                remove_ticket_handler(ac, th);
        }
 
-       if (xi->auth_authorizer.buf)
-               ceph_buffer_put(xi->auth_authorizer.buf);
+       ceph_x_authorizer_cleanup(&xi->auth_authorizer);
 
        kfree(ac->private);
        ac->private = NULL;
@@ -691,8 +698,10 @@ static int ceph_x_sign_message(struct ceph_auth_handshake *auth,
                               struct ceph_msg *msg)
 {
        int ret;
-       if (!auth->authorizer)
+
+       if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN))
                return 0;
+
        ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
                              msg, &msg->footer.sig);
        if (ret < 0)
@@ -707,8 +716,9 @@ static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth,
        __le64 sig_check;
        int ret;
 
-       if (!auth->authorizer)
+       if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN))
                return 0;
+
        ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
                              msg, &sig_check);
        if (ret < 0)
index 78f098a20796d7dd0d71b364c5de3d99239dc4ac..bcbec33c6a14a8f5d962fb1d803a6503a6e11f9b 100644 (file)
@@ -245,6 +245,8 @@ enum {
        Opt_nocrc,
        Opt_cephx_require_signatures,
        Opt_nocephx_require_signatures,
+       Opt_cephx_sign_messages,
+       Opt_nocephx_sign_messages,
        Opt_tcp_nodelay,
        Opt_notcp_nodelay,
 };
@@ -267,6 +269,8 @@ static match_table_t opt_tokens = {
        {Opt_nocrc, "nocrc"},
        {Opt_cephx_require_signatures, "cephx_require_signatures"},
        {Opt_nocephx_require_signatures, "nocephx_require_signatures"},
+       {Opt_cephx_sign_messages, "cephx_sign_messages"},
+       {Opt_nocephx_sign_messages, "nocephx_sign_messages"},
        {Opt_tcp_nodelay, "tcp_nodelay"},
        {Opt_notcp_nodelay, "notcp_nodelay"},
        {-1, NULL}
@@ -491,6 +495,12 @@ ceph_parse_options(char *options, const char *dev_name,
                case Opt_nocephx_require_signatures:
                        opt->flags |= CEPH_OPT_NOMSGAUTH;
                        break;
+               case Opt_cephx_sign_messages:
+                       opt->flags &= ~CEPH_OPT_NOMSGSIGN;
+                       break;
+               case Opt_nocephx_sign_messages:
+                       opt->flags |= CEPH_OPT_NOMSGSIGN;
+                       break;
 
                case Opt_tcp_nodelay:
                        opt->flags |= CEPH_OPT_TCP_NODELAY;
@@ -534,6 +544,8 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client)
                seq_puts(m, "nocrc,");
        if (opt->flags & CEPH_OPT_NOMSGAUTH)
                seq_puts(m, "nocephx_require_signatures,");
+       if (opt->flags & CEPH_OPT_NOMSGSIGN)
+               seq_puts(m, "nocephx_sign_messages,");
        if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0)
                seq_puts(m, "notcp_nodelay,");
 
@@ -596,11 +608,7 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
        if (ceph_test_opt(client, MYIP))
                myaddr = &client->options->my_addr;
 
-       ceph_messenger_init(&client->msgr, myaddr,
-               client->supported_features,
-               client->required_features,
-               ceph_test_opt(client, NOCRC),
-               ceph_test_opt(client, TCP_NODELAY));
+       ceph_messenger_init(&client->msgr, myaddr);
 
        /* subsystems */
        err = ceph_monc_init(&client->monc, client);
index d1498224c49d4c6c18e082ea908cb3e5a8fed30c..2e9cab09f37ba65c9c5961c12e10d2aded02413f 100644 (file)
@@ -16,8 +16,10 @@ struct ceph_crypto_key {
 
 static inline void ceph_crypto_key_destroy(struct ceph_crypto_key *key)
 {
-       if (key)
+       if (key) {
                kfree(key->key);
+               key->key = NULL;
+       }
 }
 
 int ceph_crypto_key_clone(struct ceph_crypto_key *dst,
index b9b0e3b5da49f84d9fb1775e75aca83f55e80b2d..9981039ef4ffcca29081b83a5e06a81ce6871f20 100644 (file)
@@ -509,7 +509,7 @@ static int ceph_tcp_connect(struct ceph_connection *con)
                return ret;
        }
 
-       if (con->msgr->tcp_nodelay) {
+       if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) {
                int optval = 1;
 
                ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
@@ -637,9 +637,6 @@ static int con_close_socket(struct ceph_connection *con)
 static void ceph_msg_remove(struct ceph_msg *msg)
 {
        list_del_init(&msg->list_head);
-       BUG_ON(msg->con == NULL);
-       msg->con->ops->put(msg->con);
-       msg->con = NULL;
 
        ceph_msg_put(msg);
 }
@@ -662,15 +659,14 @@ static void reset_connection(struct ceph_connection *con)
 
        if (con->in_msg) {
                BUG_ON(con->in_msg->con != con);
-               con->in_msg->con = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
-               con->ops->put(con);
        }
 
        con->connect_seq = 0;
        con->out_seq = 0;
        if (con->out_msg) {
+               BUG_ON(con->out_msg->con != con);
                ceph_msg_put(con->out_msg);
                con->out_msg = NULL;
        }
@@ -1205,7 +1201,7 @@ static void prepare_write_message_footer(struct ceph_connection *con)
        con->out_kvec[v].iov_base = &m->footer;
        if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
                if (con->ops->sign_message)
-                       con->ops->sign_message(con, m);
+                       con->ops->sign_message(m);
                else
                        m->footer.sig = 0;
                con->out_kvec[v].iov_len = sizeof(m->footer);
@@ -1432,7 +1428,8 @@ static int prepare_write_connect(struct ceph_connection *con)
        dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
             con->connect_seq, global_seq, proto);
 
-       con->out_connect.features = cpu_to_le64(con->msgr->supported_features);
+       con->out_connect.features =
+           cpu_to_le64(from_msgr(con->msgr)->supported_features);
        con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
        con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
        con->out_connect.global_seq = cpu_to_le32(global_seq);
@@ -1527,7 +1524,7 @@ static int write_partial_message_data(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->out_msg;
        struct ceph_msg_data_cursor *cursor = &msg->cursor;
-       bool do_datacrc = !con->msgr->nocrc;
+       bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
        u32 crc;
 
        dout("%s %p msg %p\n", __func__, con, msg);
@@ -1552,8 +1549,8 @@ static int write_partial_message_data(struct ceph_connection *con)
                bool need_crc;
                int ret;
 
-               page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
-                                                       &last_piece);
+               page = ceph_msg_data_next(cursor, &page_offset, &length,
+                                         &last_piece);
                ret = ceph_tcp_sendpage(con->sock, page, page_offset,
                                        length, !last_piece);
                if (ret <= 0) {
@@ -1564,7 +1561,7 @@ static int write_partial_message_data(struct ceph_connection *con)
                }
                if (do_datacrc && cursor->need_crc)
                        crc = ceph_crc32c_page(crc, page, page_offset, length);
-               need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
+               need_crc = ceph_msg_data_advance(cursor, (size_t)ret);
        }
 
        dout("%s %p msg %p done\n", __func__, con, msg);
@@ -2005,8 +2002,8 @@ static int process_banner(struct ceph_connection *con)
 
 static int process_connect(struct ceph_connection *con)
 {
-       u64 sup_feat = con->msgr->supported_features;
-       u64 req_feat = con->msgr->required_features;
+       u64 sup_feat = from_msgr(con->msgr)->supported_features;
+       u64 req_feat = from_msgr(con->msgr)->required_features;
        u64 server_feat = ceph_sanitize_features(
                                le64_to_cpu(con->in_reply.features));
        int ret;
@@ -2232,7 +2229,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->in_msg;
        struct ceph_msg_data_cursor *cursor = &msg->cursor;
-       const bool do_datacrc = !con->msgr->nocrc;
+       bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
        struct page *page;
        size_t page_offset;
        size_t length;
@@ -2246,8 +2243,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
        if (do_datacrc)
                crc = con->in_data_crc;
        while (cursor->resid) {
-               page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
-                                                       NULL);
+               page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
                ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
                if (ret <= 0) {
                        if (do_datacrc)
@@ -2258,7 +2254,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
 
                if (do_datacrc)
                        crc = ceph_crc32c_page(crc, page, page_offset, ret);
-               (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
+               (void) ceph_msg_data_advance(cursor, (size_t)ret);
        }
        if (do_datacrc)
                con->in_data_crc = crc;
@@ -2278,7 +2274,7 @@ static int read_partial_message(struct ceph_connection *con)
        int end;
        int ret;
        unsigned int front_len, middle_len, data_len;
-       bool do_datacrc = !con->msgr->nocrc;
+       bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
        bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
        u64 seq;
        u32 crc;
@@ -2423,7 +2419,7 @@ static int read_partial_message(struct ceph_connection *con)
        }
 
        if (need_sign && con->ops->check_message_signature &&
-           con->ops->check_message_signature(con, m)) {
+           con->ops->check_message_signature(m)) {
                pr_err("read_partial_message %p signature check failed\n", m);
                return -EBADMSG;
        }
@@ -2438,13 +2434,10 @@ static int read_partial_message(struct ceph_connection *con)
  */
 static void process_message(struct ceph_connection *con)
 {
-       struct ceph_msg *msg;
+       struct ceph_msg *msg = con->in_msg;
 
        BUG_ON(con->in_msg->con != con);
-       con->in_msg->con = NULL;
-       msg = con->in_msg;
        con->in_msg = NULL;
-       con->ops->put(con);
 
        /* if first message, set peer_name */
        if (con->peer_name.type == 0)
@@ -2677,7 +2670,7 @@ more:
                if (ret <= 0) {
                        switch (ret) {
                        case -EBADMSG:
-                               con->error_msg = "bad crc";
+                               con->error_msg = "bad crc/signature";
                                /* fall through */
                        case -EBADE:
                                ret = -EIO;
@@ -2918,10 +2911,8 @@ static void con_fault(struct ceph_connection *con)
 
        if (con->in_msg) {
                BUG_ON(con->in_msg->con != con);
-               con->in_msg->con = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
-               con->ops->put(con);
        }
 
        /* Requeue anything that hasn't been acked */
@@ -2952,15 +2943,8 @@ static void con_fault(struct ceph_connection *con)
  * initialize a new messenger instance
  */
 void ceph_messenger_init(struct ceph_messenger *msgr,
-                       struct ceph_entity_addr *myaddr,
-                       u64 supported_features,
-                       u64 required_features,
-                       bool nocrc,
-                       bool tcp_nodelay)
+                        struct ceph_entity_addr *myaddr)
 {
-       msgr->supported_features = supported_features;
-       msgr->required_features = required_features;
-
        spin_lock_init(&msgr->global_seq_lock);
 
        if (myaddr)
@@ -2970,8 +2954,6 @@ void ceph_messenger_init(struct ceph_messenger *msgr,
        msgr->inst.addr.type = 0;
        get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
        encode_my_addr(msgr);
-       msgr->nocrc = nocrc;
-       msgr->tcp_nodelay = tcp_nodelay;
 
        atomic_set(&msgr->stopping, 0);
        write_pnet(&msgr->net, get_net(current->nsproxy->net_ns));
@@ -2986,6 +2968,15 @@ void ceph_messenger_fini(struct ceph_messenger *msgr)
 }
 EXPORT_SYMBOL(ceph_messenger_fini);
 
+static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
+{
+       if (msg->con)
+               msg->con->ops->put(msg->con);
+
+       msg->con = con ? con->ops->get(con) : NULL;
+       BUG_ON(msg->con != con);
+}
+
 static void clear_standby(struct ceph_connection *con)
 {
        /* come back from STANDBY? */
@@ -3017,9 +3008,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
                return;
        }
 
-       BUG_ON(msg->con != NULL);
-       msg->con = con->ops->get(con);
-       BUG_ON(msg->con == NULL);
+       msg_con_set(msg, con);
 
        BUG_ON(!list_empty(&msg->list_head));
        list_add_tail(&msg->list_head, &con->out_queue);
@@ -3047,16 +3036,15 @@ void ceph_msg_revoke(struct ceph_msg *msg)
 {
        struct ceph_connection *con = msg->con;
 
-       if (!con)
+       if (!con) {
+               dout("%s msg %p null con\n", __func__, msg);
                return;         /* Message not in our possession */
+       }
 
        mutex_lock(&con->mutex);
        if (!list_empty(&msg->list_head)) {
                dout("%s %p msg %p - was on queue\n", __func__, con, msg);
                list_del_init(&msg->list_head);
-               BUG_ON(msg->con == NULL);
-               msg->con->ops->put(msg->con);
-               msg->con = NULL;
                msg->hdr.seq = 0;
 
                ceph_msg_put(msg);
@@ -3080,16 +3068,13 @@ void ceph_msg_revoke(struct ceph_msg *msg)
  */
 void ceph_msg_revoke_incoming(struct ceph_msg *msg)
 {
-       struct ceph_connection *con;
+       struct ceph_connection *con = msg->con;
 
-       BUG_ON(msg == NULL);
-       if (!msg->con) {
+       if (!con) {
                dout("%s msg %p null con\n", __func__, msg);
-
                return;         /* Message not in our possession */
        }
 
-       con = msg->con;
        mutex_lock(&con->mutex);
        if (con->in_msg == msg) {
                unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
@@ -3335,9 +3320,8 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
        }
        if (msg) {
                BUG_ON(*skip);
+               msg_con_set(msg, con);
                con->in_msg = msg;
-               con->in_msg->con = con->ops->get(con);
-               BUG_ON(con->in_msg->con == NULL);
        } else {
                /*
                 * Null message pointer means either we should skip
@@ -3384,6 +3368,8 @@ static void ceph_msg_release(struct kref *kref)
        dout("%s %p\n", __func__, m);
        WARN_ON(!list_empty(&m->list_head));
 
+       msg_con_set(m, NULL);
+
        /* drop middle, data, if any */
        if (m->middle) {
                ceph_buffer_put(m->middle);
index f79ccac6699fb7b171b680261db9000f7fe4c70c..f8f235930d887adf1df94314b18c719638328df0 100644 (file)
@@ -120,11 +120,13 @@ static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
 }
 #endif /* CONFIG_BLOCK */
 
-#define osd_req_op_data(oreq, whch, typ, fld)  \
-       ({                                              \
-               BUG_ON(whch >= (oreq)->r_num_ops);      \
-               &(oreq)->r_ops[whch].typ.fld;           \
-       })
+#define osd_req_op_data(oreq, whch, typ, fld)                          \
+({                                                                     \
+       struct ceph_osd_request *__oreq = (oreq);                       \
+       unsigned int __whch = (whch);                                   \
+       BUG_ON(__whch >= __oreq->r_num_ops);                            \
+       &__oreq->r_ops[__whch].typ.fld;                                 \
+})
 
 static struct ceph_osd_data *
 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
@@ -1750,8 +1752,7 @@ static void complete_request(struct ceph_osd_request *req)
  * handle osd op reply.  either call the callback if it is specified,
  * or do the completion to wake up the waiting thread.
  */
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
-                        struct ceph_connection *con)
+static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 {
        void *p, *end;
        struct ceph_osd_request *req;
@@ -2807,7 +2808,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
                ceph_osdc_handle_map(osdc, msg);
                break;
        case CEPH_MSG_OSD_OPREPLY:
-               handle_reply(osdc, msg, con);
+               handle_reply(osdc, msg);
                break;
        case CEPH_MSG_WATCH_NOTIFY:
                handle_watch_notify(osdc, msg);
@@ -2849,9 +2850,6 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                goto out;
        }
 
-       if (req->r_reply->con)
-               dout("%s revoking msg %p from old con %p\n", __func__,
-                    req->r_reply, req->r_reply->con);
        ceph_msg_revoke_incoming(req->r_reply);
 
        if (front_len > req->r_reply->front_alloc_len) {
@@ -2978,17 +2976,19 @@ static int invalidate_authorizer(struct ceph_connection *con)
        return ceph_monc_validate_auth(&osdc->client->monc);
 }
 
-static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
+static int osd_sign_message(struct ceph_msg *msg)
 {
-       struct ceph_osd *o = con->private;
+       struct ceph_osd *o = msg->con->private;
        struct ceph_auth_handshake *auth = &o->o_auth;
+
        return ceph_auth_sign_message(auth, msg);
 }
 
-static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
+static int osd_check_message_signature(struct ceph_msg *msg)
 {
-       struct ceph_osd *o = con->private;
+       struct ceph_osd *o = msg->con->private;
        struct ceph_auth_handshake *auth = &o->o_auth;
+
        return ceph_auth_check_message_signature(auth, msg);
 }
 
@@ -3000,7 +3000,7 @@ static const struct ceph_connection_operations osd_con_ops = {
        .verify_authorizer_reply = verify_authorizer_reply,
        .invalidate_authorizer = invalidate_authorizer,
        .alloc_msg = alloc_msg,
-       .sign_message = sign_message,
-       .check_message_signature = check_message_signature,
+       .sign_message = osd_sign_message,
+       .check_message_signature = osd_check_message_signature,
        .fault = osd_reset,
 };