Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
authorLinus Torvalds <torvalds@linux-foundation.org>
Wed, 11 Aug 2010 16:18:32 +0000 (09:18 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Wed, 11 Aug 2010 16:18:32 +0000 (09:18 -0700)
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (39 commits)
  ceph: generalize mon requests, add pool op support
  ceph: only queue async writeback on cap revocation if there is dirty data
  ceph: do not ignore osd_idle_ttl mount option
  ceph: constify dentry_operations
  ceph: whitespace cleanup
  ceph: add flock/fcntl lock support
  ceph: define on-wire types, constants for file locking support
  ceph: add CEPH_FEATURE_FLOCK to the supported feature bits
  ceph: support v2 reconnect encoding
  ceph: support v2 client_caps encoding
  ceph: move AES iv definition to shared header
  ceph: fix decoding of pool snap info
  ceph: make ->sync_fs not wait if wait==0
  ceph: warn on missing snap realm
  ceph: print useful error message when crush rule not found
  ceph: use %pU to print uuid (fsid)
  ceph: sync header defs with server code
  ceph: clean up header guards
  ceph: strip misleading/obsolete version, feature info
  ceph: specify supported features in super.h
  ...

39 files changed:
fs/ceph/Makefile
fs/ceph/addr.c
fs/ceph/armor.c
fs/ceph/auth.c
fs/ceph/auth_x.c
fs/ceph/buffer.c
fs/ceph/caps.c
fs/ceph/ceph_frag.h
fs/ceph/ceph_fs.c
fs/ceph/ceph_fs.h
fs/ceph/ceph_hash.h
fs/ceph/ceph_strings.c
fs/ceph/crush/crush.h
fs/ceph/crush/hash.h
fs/ceph/crush/mapper.h
fs/ceph/crypto.c
fs/ceph/crypto.h
fs/ceph/debugfs.c
fs/ceph/decode.h
fs/ceph/dir.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/ioctl.c
fs/ceph/ioctl.h
fs/ceph/locks.c [new file with mode: 0644]
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/mdsmap.c
fs/ceph/mdsmap.h
fs/ceph/messenger.c
fs/ceph/mon_client.c
fs/ceph/mon_client.h
fs/ceph/msgr.h
fs/ceph/osd_client.c
fs/ceph/osdmap.c
fs/ceph/rados.h
fs/ceph/super.c
fs/ceph/super.h
fs/ceph/xattr.c

index 6a660e610be8e4a40dd0d8dd2655fcedff9a16d9..278e1172600dc3a3d5acba3654c53719d6f38697 100644 (file)
@@ -6,7 +6,7 @@ ifneq ($(KERNELRELEASE),)
 
 obj-$(CONFIG_CEPH_FS) += ceph.o
 
-ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
+ceph-objs := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
        export.o caps.o snap.o xattr.o \
        messenger.o msgpool.o buffer.o pagelist.o \
        mds_client.o mdsmap.o \
index d9c60b84949a6c6a0f807ab59a234f2760fca90d..5598a0d02295d11eaa3646e137497a083910251e 100644 (file)
@@ -309,7 +309,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
                        zero_user_segment(page, s, PAGE_CACHE_SIZE);
                }
 
-               if (add_to_page_cache_lru(page, mapping, page->index, GFP_NOFS)) {
+               if (add_to_page_cache_lru(page, mapping, page->index,
+                                         GFP_NOFS)) {
                        page_cache_release(page);
                        dout("readpages %p add_to_page_cache failed %p\n",
                             inode, page);
@@ -552,7 +553,7 @@ static void writepages_finish(struct ceph_osd_request *req,
                 * page truncation thread, possibly losing some data that
                 * raced its way in
                 */
-               if ((issued & CEPH_CAP_FILE_CACHE) == 0)
+               if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
                        generic_error_remove_page(inode->i_mapping, page);
 
                unlock_page(page);
@@ -797,9 +798,12 @@ get_more_pages:
                        dout("%p will write page %p idx %lu\n",
                             inode, page, page->index);
 
-                       writeback_stat = atomic_long_inc_return(&client->writeback_count);
-                       if (writeback_stat > CONGESTION_ON_THRESH(client->mount_args->congestion_kb)) {
-                               set_bdi_congested(&client->backing_dev_info, BLK_RW_ASYNC);
+                       writeback_stat =
+                              atomic_long_inc_return(&client->writeback_count);
+                       if (writeback_stat > CONGESTION_ON_THRESH(
+                                   client->mount_args->congestion_kb)) {
+                               set_bdi_congested(&client->backing_dev_info,
+                                                 BLK_RW_ASYNC);
                        }
 
                        set_page_writeback(page);
@@ -1036,7 +1040,7 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
                *pagep = page;
 
                dout("write_begin file %p inode %p page %p %d~%d\n", file,
-               inode, page, (int)pos, (int)len);
+                    inode, page, (int)pos, (int)len);
 
                r = ceph_update_writeable_page(file, pos, len, page);
        } while (r == -EAGAIN);
index 67b2c030924b8bfd612c568d96a2ca0eb147b7a4..eb2a666b0be7011e9b27618c62060416543cb706 100644 (file)
@@ -1,11 +1,15 @@
 
 #include <linux/errno.h>
 
+int ceph_armor(char *dst, const char *src, const char *end);
+int ceph_unarmor(char *dst, const char *src, const char *end);
+
 /*
  * base64 encode/decode.
  */
 
-const char *pem_key = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
+static const char *pem_key =
+       "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
 
 static int encode_bits(int c)
 {
index 89490beaf5374a08bd933686fd6482c608d6d3a2..6d2e30600627e71d25c2f93d5862952a078472b8 100644 (file)
@@ -20,7 +20,7 @@ static u32 supported_protocols[] = {
        CEPH_AUTH_CEPHX
 };
 
-int ceph_auth_init_protocol(struct ceph_auth_client *ac, int protocol)
+static int ceph_auth_init_protocol(struct ceph_auth_client *ac, int protocol)
 {
        switch (protocol) {
        case CEPH_AUTH_NONE:
@@ -133,8 +133,8 @@ bad:
        return -ERANGE;
 }
 
-int ceph_build_auth_request(struct ceph_auth_client *ac,
-                          void *msg_buf, size_t msg_len)
+static int ceph_build_auth_request(struct ceph_auth_client *ac,
+                                  void *msg_buf, size_t msg_len)
 {
        struct ceph_mon_request_header *monhdr = msg_buf;
        void *p = monhdr + 1;
index 6d44053ecff1f08af7bee8546a0435ac55a0e1e2..582e0b2caf8a3ce6c2b943c0980c119be7950a63 100644 (file)
@@ -87,8 +87,8 @@ static int ceph_x_decrypt(struct ceph_crypto_key *secret,
 /*
  * get existing (or insert new) ticket handler
  */
-struct ceph_x_ticket_handler *get_ticket_handler(struct ceph_auth_client *ac,
-                                                int service)
+static struct ceph_x_ticket_handler *
+get_ticket_handler(struct ceph_auth_client *ac, int service)
 {
        struct ceph_x_ticket_handler *th;
        struct ceph_x_info *xi = ac->private;
@@ -429,7 +429,7 @@ static int ceph_x_build_request(struct ceph_auth_client *ac,
                auth->struct_v = 1;
                auth->key = 0;
                for (u = (u64 *)tmp_enc; u + 1 <= (u64 *)(tmp_enc + ret); u++)
-                       auth->key ^= *u;
+                       auth->key ^= *(__le64 *)u;
                dout(" server_challenge %llx client_challenge %llx key %llx\n",
                     xi->server_challenge, le64_to_cpu(auth->client_challenge),
                     le64_to_cpu(auth->key));
index c67535d70aa68a5ab3869f9dce4cc8bbe448e3e0..cd39f17021de9ab6cde35ce444a727970f4f8fab 100644 (file)
@@ -47,22 +47,6 @@ void ceph_buffer_release(struct kref *kref)
        kfree(b);
 }
 
-int ceph_buffer_alloc(struct ceph_buffer *b, int len, gfp_t gfp)
-{
-       b->vec.iov_base = kmalloc(len, gfp | __GFP_NOWARN);
-       if (b->vec.iov_base) {
-               b->is_vmalloc = false;
-       } else {
-               b->vec.iov_base = __vmalloc(len, gfp, PAGE_KERNEL);
-               b->is_vmalloc = true;
-       }
-       if (!b->vec.iov_base)
-               return -ENOMEM;
-       b->alloc_len = len;
-       b->vec.iov_len = len;
-       return 0;
-}
-
 int ceph_decode_buffer(struct ceph_buffer **b, void **p, void *end)
 {
        size_t len;
index b81be9a5648792c3e6988855a903fb65473fa9c9..7bf182b0397396f961151a1c1b59218bc1cb2749 100644 (file)
@@ -113,58 +113,41 @@ const char *ceph_cap_string(int caps)
        return cap_str[i];
 }
 
-/*
- * Cap reservations
- *
- * Maintain a global pool of preallocated struct ceph_caps, referenced
- * by struct ceph_caps_reservations.  This ensures that we preallocate
- * memory needed to successfully process an MDS response.  (If an MDS
- * sends us cap information and we fail to process it, we will have
- * problems due to the client and MDS being out of sync.)
- *
- * Reservations are 'owned' by a ceph_cap_reservation context.
- */
-static spinlock_t caps_list_lock;
-static struct list_head caps_list;  /* unused (reserved or unreserved) */
-static int caps_total_count;        /* total caps allocated */
-static int caps_use_count;          /* in use */
-static int caps_reserve_count;      /* unused, reserved */
-static int caps_avail_count;        /* unused, unreserved */
-static int caps_min_count;          /* keep at least this many (unreserved) */
-
-void __init ceph_caps_init(void)
+void ceph_caps_init(struct ceph_mds_client *mdsc)
 {
-       INIT_LIST_HEAD(&caps_list);
-       spin_lock_init(&caps_list_lock);
+       INIT_LIST_HEAD(&mdsc->caps_list);
+       spin_lock_init(&mdsc->caps_list_lock);
 }
 
-void ceph_caps_finalize(void)
+void ceph_caps_finalize(struct ceph_mds_client *mdsc)
 {
        struct ceph_cap *cap;
 
-       spin_lock(&caps_list_lock);
-       while (!list_empty(&caps_list)) {
-               cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
+       spin_lock(&mdsc->caps_list_lock);
+       while (!list_empty(&mdsc->caps_list)) {
+               cap = list_first_entry(&mdsc->caps_list,
+                                      struct ceph_cap, caps_item);
                list_del(&cap->caps_item);
                kmem_cache_free(ceph_cap_cachep, cap);
        }
-       caps_total_count = 0;
-       caps_avail_count = 0;
-       caps_use_count = 0;
-       caps_reserve_count = 0;
-       caps_min_count = 0;
-       spin_unlock(&caps_list_lock);
+       mdsc->caps_total_count = 0;
+       mdsc->caps_avail_count = 0;
+       mdsc->caps_use_count = 0;
+       mdsc->caps_reserve_count = 0;
+       mdsc->caps_min_count = 0;
+       spin_unlock(&mdsc->caps_list_lock);
 }
 
-void ceph_adjust_min_caps(int delta)
+void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
 {
-       spin_lock(&caps_list_lock);
-       caps_min_count += delta;
-       BUG_ON(caps_min_count < 0);
-       spin_unlock(&caps_list_lock);
+       spin_lock(&mdsc->caps_list_lock);
+       mdsc->caps_min_count += delta;
+       BUG_ON(mdsc->caps_min_count < 0);
+       spin_unlock(&mdsc->caps_list_lock);
 }
 
-int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
+int ceph_reserve_caps(struct ceph_mds_client *mdsc,
+                     struct ceph_cap_reservation *ctx, int need)
 {
        int i;
        struct ceph_cap *cap;
@@ -176,16 +159,17 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
        dout("reserve caps ctx=%p need=%d\n", ctx, need);
 
        /* first reserve any caps that are already allocated */
-       spin_lock(&caps_list_lock);
-       if (caps_avail_count >= need)
+       spin_lock(&mdsc->caps_list_lock);
+       if (mdsc->caps_avail_count >= need)
                have = need;
        else
-               have = caps_avail_count;
-       caps_avail_count -= have;
-       caps_reserve_count += have;
-       BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-              caps_avail_count);
-       spin_unlock(&caps_list_lock);
+               have = mdsc->caps_avail_count;
+       mdsc->caps_avail_count -= have;
+       mdsc->caps_reserve_count += have;
+       BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+                                        mdsc->caps_reserve_count +
+                                        mdsc->caps_avail_count);
+       spin_unlock(&mdsc->caps_list_lock);
 
        for (i = have; i < need; i++) {
                cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
@@ -198,19 +182,20 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
        }
        BUG_ON(have + alloc != need);
 
-       spin_lock(&caps_list_lock);
-       caps_total_count += alloc;
-       caps_reserve_count += alloc;
-       list_splice(&newcaps, &caps_list);
+       spin_lock(&mdsc->caps_list_lock);
+       mdsc->caps_total_count += alloc;
+       mdsc->caps_reserve_count += alloc;
+       list_splice(&newcaps, &mdsc->caps_list);
 
-       BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-              caps_avail_count);
-       spin_unlock(&caps_list_lock);
+       BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+                                        mdsc->caps_reserve_count +
+                                        mdsc->caps_avail_count);
+       spin_unlock(&mdsc->caps_list_lock);
 
        ctx->count = need;
        dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
-            ctx, caps_total_count, caps_use_count, caps_reserve_count,
-            caps_avail_count);
+            ctx, mdsc->caps_total_count, mdsc->caps_use_count,
+            mdsc->caps_reserve_count, mdsc->caps_avail_count);
        return 0;
 
 out_alloc_count:
@@ -220,26 +205,29 @@ out_alloc_count:
        return ret;
 }
 
-int ceph_unreserve_caps(struct ceph_cap_reservation *ctx)
+int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
+                       struct ceph_cap_reservation *ctx)
 {
        dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
        if (ctx->count) {
-               spin_lock(&caps_list_lock);
-               BUG_ON(caps_reserve_count < ctx->count);
-               caps_reserve_count -= ctx->count;
-               caps_avail_count += ctx->count;
+               spin_lock(&mdsc->caps_list_lock);
+               BUG_ON(mdsc->caps_reserve_count < ctx->count);
+               mdsc->caps_reserve_count -= ctx->count;
+               mdsc->caps_avail_count += ctx->count;
                ctx->count = 0;
                dout("unreserve caps %d = %d used + %d resv + %d avail\n",
-                    caps_total_count, caps_use_count, caps_reserve_count,
-                    caps_avail_count);
-               BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-                      caps_avail_count);
-               spin_unlock(&caps_list_lock);
+                    mdsc->caps_total_count, mdsc->caps_use_count,
+                    mdsc->caps_reserve_count, mdsc->caps_avail_count);
+               BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+                                                mdsc->caps_reserve_count +
+                                                mdsc->caps_avail_count);
+               spin_unlock(&mdsc->caps_list_lock);
        }
        return 0;
 }
 
-static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
+static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
+                               struct ceph_cap_reservation *ctx)
 {
        struct ceph_cap *cap = NULL;
 
@@ -247,71 +235,74 @@ static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
        if (!ctx) {
                cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
                if (cap) {
-                       caps_use_count++;
-                       caps_total_count++;
+                       mdsc->caps_use_count++;
+                       mdsc->caps_total_count++;
                }
                return cap;
        }
 
-       spin_lock(&caps_list_lock);
+       spin_lock(&mdsc->caps_list_lock);
        dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
-            ctx, ctx->count, caps_total_count, caps_use_count,
-            caps_reserve_count, caps_avail_count);
+            ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
+            mdsc->caps_reserve_count, mdsc->caps_avail_count);
        BUG_ON(!ctx->count);
-       BUG_ON(ctx->count > caps_reserve_count);
-       BUG_ON(list_empty(&caps_list));
+       BUG_ON(ctx->count > mdsc->caps_reserve_count);
+       BUG_ON(list_empty(&mdsc->caps_list));
 
        ctx->count--;
-       caps_reserve_count--;
-       caps_use_count++;
+       mdsc->caps_reserve_count--;
+       mdsc->caps_use_count++;
 
-       cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
+       cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
        list_del(&cap->caps_item);
 
-       BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-              caps_avail_count);
-       spin_unlock(&caps_list_lock);
+       BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+              mdsc->caps_reserve_count + mdsc->caps_avail_count);
+       spin_unlock(&mdsc->caps_list_lock);
        return cap;
 }
 
-void ceph_put_cap(struct ceph_cap *cap)
+void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
 {
-       spin_lock(&caps_list_lock);
+       spin_lock(&mdsc->caps_list_lock);
        dout("put_cap %p %d = %d used + %d resv + %d avail\n",
-            cap, caps_total_count, caps_use_count,
-            caps_reserve_count, caps_avail_count);
-       caps_use_count--;
+            cap, mdsc->caps_total_count, mdsc->caps_use_count,
+            mdsc->caps_reserve_count, mdsc->caps_avail_count);
+       mdsc->caps_use_count--;
        /*
         * Keep some preallocated caps around (ceph_min_count), to
         * avoid lots of free/alloc churn.
         */
-       if (caps_avail_count >= caps_reserve_count + caps_min_count) {
-               caps_total_count--;
+       if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
+                                     mdsc->caps_min_count) {
+               mdsc->caps_total_count--;
                kmem_cache_free(ceph_cap_cachep, cap);
        } else {
-               caps_avail_count++;
-               list_add(&cap->caps_item, &caps_list);
+               mdsc->caps_avail_count++;
+               list_add(&cap->caps_item, &mdsc->caps_list);
        }
 
-       BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-              caps_avail_count);
-       spin_unlock(&caps_list_lock);
+       BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+              mdsc->caps_reserve_count + mdsc->caps_avail_count);
+       spin_unlock(&mdsc->caps_list_lock);
 }
 
 void ceph_reservation_status(struct ceph_client *client,
                             int *total, int *avail, int *used, int *reserved,
                             int *min)
 {
+       struct ceph_mds_client *mdsc = &client->mdsc;
+
        if (total)
-               *total = caps_total_count;
+               *total = mdsc->caps_total_count;
        if (avail)
-               *avail = caps_avail_count;
+               *avail = mdsc->caps_avail_count;
        if (used)
-               *used = caps_use_count;
+               *used = mdsc->caps_use_count;
        if (reserved)
-               *reserved = caps_reserve_count;
+               *reserved = mdsc->caps_reserve_count;
        if (min)
-               *min = caps_min_count;
+               *min = mdsc->caps_min_count;
 }
 
 /*
@@ -336,22 +327,29 @@ static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
        return NULL;
 }
 
+struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
+{
+       struct ceph_cap *cap;
+
+       spin_lock(&ci->vfs_inode.i_lock);
+       cap = __get_cap_for_mds(ci, mds);
+       spin_unlock(&ci->vfs_inode.i_lock);
+       return cap;
+}
+
 /*
- * Return id of any MDS with a cap, preferably FILE_WR|WRBUFFER|EXCL, else
- * -1.
+ * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
  */
-static int __ceph_get_cap_mds(struct ceph_inode_info *ci, u32 *mseq)
+static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
 {
        struct ceph_cap *cap;
        int mds = -1;
        struct rb_node *p;
 
-       /* prefer mds with WR|WRBUFFER|EXCL caps */
+       /* prefer mds with WR|BUFFER|EXCL caps */
        for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
                cap = rb_entry(p, struct ceph_cap, ci_node);
                mds = cap->mds;
-               if (mseq)
-                       *mseq = cap->mseq;
                if (cap->issued & (CEPH_CAP_FILE_WR |
                                   CEPH_CAP_FILE_BUFFER |
                                   CEPH_CAP_FILE_EXCL))
@@ -364,7 +362,7 @@ int ceph_get_cap_mds(struct inode *inode)
 {
        int mds;
        spin_lock(&inode->i_lock);
-       mds = __ceph_get_cap_mds(ceph_inode(inode), NULL);
+       mds = __ceph_get_cap_mds(ceph_inode(inode));
        spin_unlock(&inode->i_lock);
        return mds;
 }
@@ -483,8 +481,8 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
         * Each time we receive FILE_CACHE anew, we increment
         * i_rdcache_gen.
         */
-       if ((issued & CEPH_CAP_FILE_CACHE) &&
-           (had & CEPH_CAP_FILE_CACHE) == 0)
+       if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+           (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
                ci->i_rdcache_gen++;
 
        /*
@@ -543,7 +541,7 @@ retry:
                        new_cap = NULL;
                } else {
                        spin_unlock(&inode->i_lock);
-                       new_cap = get_cap(caps_reservation);
+                       new_cap = get_cap(mdsc, caps_reservation);
                        if (new_cap == NULL)
                                return -ENOMEM;
                        goto retry;
@@ -588,6 +586,7 @@ retry:
                } else {
                        pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
                               realmino);
+                       WARN_ON(!realm);
                }
        }
 
@@ -831,7 +830,7 @@ int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
 {
        int want = 0;
        int mode;
-       for (mode = 0; mode < 4; mode++)
+       for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++)
                if (ci->i_nr_by_mode[mode])
                        want |= ceph_caps_for_mode(mode);
        return want;
@@ -901,7 +900,7 @@ void __ceph_remove_cap(struct ceph_cap *cap)
                ci->i_auth_cap = NULL;
 
        if (removed)
-               ceph_put_cap(cap);
+               ceph_put_cap(mdsc, cap);
 
        if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
                struct ceph_snap_realm *realm = ci->i_snap_realm;
@@ -1197,6 +1196,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
  */
 void __ceph_flush_snaps(struct ceph_inode_info *ci,
                        struct ceph_mds_session **psession)
+               __releases(ci->vfs_inode->i_lock)
+               __acquires(ci->vfs_inode->i_lock)
 {
        struct inode *inode = &ci->vfs_inode;
        int mds;
@@ -1232,7 +1233,13 @@ retry:
                BUG_ON(capsnap->dirty == 0);
 
                /* pick mds, take s_mutex */
-               mds = __ceph_get_cap_mds(ci, &mseq);
+               if (ci->i_auth_cap == NULL) {
+                       dout("no auth cap (migrating?), doing nothing\n");
+                       goto out;
+               }
+               mds = ci->i_auth_cap->session->s_mds;
+               mseq = ci->i_auth_cap->mseq;
+
                if (session && session->s_mds != mds) {
                        dout("oops, wrong session %p mutex\n", session);
                        mutex_unlock(&session->s_mutex);
@@ -1251,8 +1258,8 @@ retry:
                        }
                        /*
                         * if session == NULL, we raced against a cap
-                        * deletion.  retry, and we'll get a better
-                        * @mds value next time.
+                        * deletion or migration.  retry, and we'll
+                        * get a better @mds value next time.
                         */
                        spin_lock(&inode->i_lock);
                        goto retry;
@@ -1290,6 +1297,7 @@ retry:
        list_del_init(&ci->i_snap_flush_item);
        spin_unlock(&mdsc->snap_flush_lock);
 
+out:
        if (psession)
                *psession = session;
        else if (session) {
@@ -1435,7 +1443,6 @@ static int try_nonblocking_invalidate(struct inode *inode)
  */
 void ceph_check_caps(struct ceph_inode_info *ci, int flags,
                     struct ceph_mds_session *session)
-       __releases(session->s_mutex)
 {
        struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
        struct ceph_mds_client *mdsc = &client->mdsc;
@@ -1510,11 +1517,13 @@ retry_locked:
            ci->i_wrbuffer_ref == 0 &&               /* no dirty pages... */
            ci->i_rdcache_gen &&                     /* may have cached pages */
            (file_wanted == 0 ||                     /* no open files */
-            (revoking & CEPH_CAP_FILE_CACHE)) &&     /*  or revoking cache */
+            (revoking & (CEPH_CAP_FILE_CACHE|
+                         CEPH_CAP_FILE_LAZYIO))) && /*  or revoking cache */
            !tried_invalidate) {
                dout("check_caps trying to invalidate on %p\n", inode);
                if (try_nonblocking_invalidate(inode) < 0) {
-                       if (revoking & CEPH_CAP_FILE_CACHE) {
+                       if (revoking & (CEPH_CAP_FILE_CACHE|
+                                       CEPH_CAP_FILE_LAZYIO)) {
                                dout("check_caps queuing invalidate\n");
                                queue_invalidate = 1;
                                ci->i_rdcache_revoking = ci->i_rdcache_gen;
@@ -2250,8 +2259,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
                             struct ceph_mds_session *session,
                             struct ceph_cap *cap,
                             struct ceph_buffer *xattr_buf)
-       __releases(inode->i_lock)
-       __releases(session->s_mutex)
+               __releases(inode->i_lock)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        int mds = session->s_mds;
@@ -2278,6 +2286,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
         * will invalidate _after_ writeback.)
         */
        if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
+           (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
            !ci->i_wrbuffer_ref) {
                if (try_nonblocking_invalidate(inode) == 0) {
                        revoked_rdcache = 1;
@@ -2369,15 +2378,22 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
 
        /* revocation, grant, or no-op? */
        if (cap->issued & ~newcaps) {
-               dout("revocation: %s -> %s\n", ceph_cap_string(cap->issued),
-                    ceph_cap_string(newcaps));
-               if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER)
-                       writeback = 1; /* will delay ack */
-               else if (dirty & ~newcaps)
-                       check_caps = 1;  /* initiate writeback in check_caps */
-               else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 ||
-                          revoked_rdcache)
-                       check_caps = 2;     /* send revoke ack in check_caps */
+               int revoking = cap->issued & ~newcaps;
+
+               dout("revocation: %s -> %s (revoking %s)\n",
+                    ceph_cap_string(cap->issued),
+                    ceph_cap_string(newcaps),
+                    ceph_cap_string(revoking));
+               if (revoking & used & CEPH_CAP_FILE_BUFFER)
+                       writeback = 1;  /* initiate writeback; will delay ack */
+               else if (revoking == CEPH_CAP_FILE_CACHE &&
+                        (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
+                        queue_invalidate)
+                       ; /* do nothing yet, invalidation will be queued */
+               else if (cap == ci->i_auth_cap)
+                       check_caps = 1; /* check auth cap only */
+               else
+                       check_caps = 2; /* check all caps */
                cap->issued = newcaps;
                cap->implemented |= newcaps;
        } else if (cap->issued == newcaps) {
@@ -2568,7 +2584,8 @@ static void handle_cap_trunc(struct inode *inode,
  * caller holds s_mutex
  */
 static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
-                             struct ceph_mds_session *session)
+                             struct ceph_mds_session *session,
+                             int *open_target_sessions)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        int mds = session->s_mds;
@@ -2600,6 +2617,12 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
                        ci->i_cap_exporting_mds = mds;
                        ci->i_cap_exporting_mseq = mseq;
                        ci->i_cap_exporting_issued = cap->issued;
+
+                       /*
+                        * make sure we have open sessions with all possible
+                        * export targets, so that we get the matching IMPORT
+                        */
+                       *open_target_sessions = 1;
                }
                __ceph_remove_cap(cap);
        }
@@ -2675,6 +2698,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        u64 size, max_size;
        u64 tid;
        void *snaptrace;
+       size_t snaptrace_len;
+       void *flock;
+       u32 flock_len;
+       int open_target_sessions = 0;
 
        dout("handle_caps from mds%d\n", mds);
 
@@ -2683,7 +2710,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        if (msg->front.iov_len < sizeof(*h))
                goto bad;
        h = msg->front.iov_base;
-       snaptrace = h + 1;
        op = le32_to_cpu(h->op);
        vino.ino = le64_to_cpu(h->ino);
        vino.snap = CEPH_NOSNAP;
@@ -2693,6 +2719,21 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        size = le64_to_cpu(h->size);
        max_size = le64_to_cpu(h->max_size);
 
+       snaptrace = h + 1;
+       snaptrace_len = le32_to_cpu(h->snap_trace_len);
+
+       if (le16_to_cpu(msg->hdr.version) >= 2) {
+               void *p, *end;
+
+               p = snaptrace + snaptrace_len;
+               end = msg->front.iov_base + msg->front.iov_len;
+               ceph_decode_32_safe(&p, end, flock_len, bad);
+               flock = p;
+       } else {
+               flock = NULL;
+               flock_len = 0;
+       }
+
        mutex_lock(&session->s_mutex);
        session->s_seq++;
        dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
@@ -2714,7 +2755,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                 * along for the mds (who clearly thinks we still have this
                 * cap).
                 */
-               ceph_add_cap_releases(mdsc, session, -1);
+               ceph_add_cap_releases(mdsc, session);
                ceph_send_cap_releases(mdsc, session);
                goto done;
        }
@@ -2726,12 +2767,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                goto done;
 
        case CEPH_CAP_OP_EXPORT:
-               handle_cap_export(inode, h, session);
+               handle_cap_export(inode, h, session, &open_target_sessions);
                goto done;
 
        case CEPH_CAP_OP_IMPORT:
                handle_cap_import(mdsc, inode, h, session,
-                                 snaptrace, le32_to_cpu(h->snap_trace_len));
+                                 snaptrace, snaptrace_len);
                ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY,
                                session);
                goto done_unlocked;
@@ -2773,6 +2814,8 @@ done:
 done_unlocked:
        if (inode)
                iput(inode);
+       if (open_target_sessions)
+               ceph_mdsc_open_export_target_sessions(mdsc, session);
        return;
 
 bad:
index 793f50cb7c224f6b56238741470c542a57a85b5b..5babb8e95352a6a7468feca70a343fafb56bddad 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef _FS_CEPH_FRAG_H
-#define _FS_CEPH_FRAG_H
+#ifndef FS_CEPH_FRAG_H
+#define FS_CEPH_FRAG_H
 
 /*
  * "Frags" are a way to describe a subset of a 32-bit number space,
index 79d76bc4303ff7a9ce822df4b2ac24f1a6cd9604..3ac6cc7c1156dafd78262e7290edf914c3f01e31 100644 (file)
@@ -29,46 +29,44 @@ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout)
 
 int ceph_flags_to_mode(int flags)
 {
+       int mode;
+
 #ifdef O_DIRECTORY  /* fixme */
        if ((flags & O_DIRECTORY) == O_DIRECTORY)
                return CEPH_FILE_MODE_PIN;
 #endif
+       if ((flags & O_APPEND) == O_APPEND)
+               flags |= O_WRONLY;
+
+       if ((flags & O_ACCMODE) == O_RDWR)
+               mode = CEPH_FILE_MODE_RDWR;
+       else if ((flags & O_ACCMODE) == O_WRONLY)
+               mode = CEPH_FILE_MODE_WR;
+       else
+               mode = CEPH_FILE_MODE_RD;
+
 #ifdef O_LAZY
        if (flags & O_LAZY)
-               return CEPH_FILE_MODE_LAZY;
+               mode |= CEPH_FILE_MODE_LAZY;
 #endif
-       if ((flags & O_APPEND) == O_APPEND)
-               flags |= O_WRONLY;
 
-       flags &= O_ACCMODE;
-       if ((flags & O_RDWR) == O_RDWR)
-               return CEPH_FILE_MODE_RDWR;
-       if ((flags & O_WRONLY) == O_WRONLY)
-               return CEPH_FILE_MODE_WR;
-       return CEPH_FILE_MODE_RD;
+       return mode;
 }
 
 int ceph_caps_for_mode(int mode)
 {
-       switch (mode) {
-       case CEPH_FILE_MODE_PIN:
-               return CEPH_CAP_PIN;
-       case CEPH_FILE_MODE_RD:
-               return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED |
+       int caps = CEPH_CAP_PIN;
+
+       if (mode & CEPH_FILE_MODE_RD)
+               caps |= CEPH_CAP_FILE_SHARED |
                        CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE;
-       case CEPH_FILE_MODE_RDWR:
-               return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED |
-                       CEPH_CAP_FILE_EXCL |
-                       CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE |
-                       CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER |
-                       CEPH_CAP_AUTH_SHARED | CEPH_CAP_AUTH_EXCL |
-                       CEPH_CAP_XATTR_SHARED | CEPH_CAP_XATTR_EXCL;
-       case CEPH_FILE_MODE_WR:
-               return CEPH_CAP_PIN | CEPH_CAP_FILE_SHARED |
-                       CEPH_CAP_FILE_EXCL |
+       if (mode & CEPH_FILE_MODE_WR)
+               caps |= CEPH_CAP_FILE_EXCL |
                        CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER |
                        CEPH_CAP_AUTH_SHARED | CEPH_CAP_AUTH_EXCL |
                        CEPH_CAP_XATTR_SHARED | CEPH_CAP_XATTR_EXCL;
-       }
-       return 0;
+       if (mode & CEPH_FILE_MODE_LAZY)
+               caps |= CEPH_CAP_FILE_LAZYIO;
+
+       return caps;
 }
index 2fa992eaf7daae57217ab5e7f8b2f228299fc101..d5619ac86711a6b41c56a00da7660099352d5327 100644 (file)
@@ -9,26 +9,12 @@
  * LGPL2
  */
 
-#ifndef _FS_CEPH_CEPH_FS_H
-#define _FS_CEPH_CEPH_FS_H
+#ifndef CEPH_FS_H
+#define CEPH_FS_H
 
 #include "msgr.h"
 #include "rados.h"
 
-/*
- * Ceph release version
- */
-#define CEPH_VERSION_MAJOR 0
-#define CEPH_VERSION_MINOR 20
-#define CEPH_VERSION_PATCH 0
-
-#define _CEPH_STRINGIFY(x) #x
-#define CEPH_STRINGIFY(x) _CEPH_STRINGIFY(x)
-#define CEPH_MAKE_VERSION(x, y, z) CEPH_STRINGIFY(x) "." CEPH_STRINGIFY(y) \
-       "." CEPH_STRINGIFY(z)
-#define CEPH_VERSION CEPH_MAKE_VERSION(CEPH_VERSION_MAJOR, \
-                                      CEPH_VERSION_MINOR, CEPH_VERSION_PATCH)
-
 /*
  * subprotocol versions.  when specific messages types or high-level
  * protocols change, bump the affected components.  we keep rev
 /*
  * feature bits
  */
-#define CEPH_FEATURE_UID        1
-#define CEPH_FEATURE_NOSRCADDR  2
-#define CEPH_FEATURE_FLOCK      4
-
-#define CEPH_FEATURE_SUPPORTED_MON  CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
-#define CEPH_FEATURE_REQUIRED_MON   CEPH_FEATURE_UID
-#define CEPH_FEATURE_SUPPORTED_MDS  CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR|CEPH_FEATURE_FLOCK
-#define CEPH_FEATURE_REQUIRED_MDS   CEPH_FEATURE_UID
-#define CEPH_FEATURE_SUPPORTED_OSD  CEPH_FEATURE_UID|CEPH_FEATURE_NOSRCADDR
-#define CEPH_FEATURE_REQUIRED_OSD   CEPH_FEATURE_UID
-#define CEPH_FEATURE_SUPPORTED_CLIENT CEPH_FEATURE_NOSRCADDR
-#define CEPH_FEATURE_REQUIRED_CLIENT CEPH_FEATURE_NOSRCADDR
+#define CEPH_FEATURE_UID            (1<<0)
+#define CEPH_FEATURE_NOSRCADDR      (1<<1)
+#define CEPH_FEATURE_MONCLOCKCHECK  (1<<2)
+#define CEPH_FEATURE_FLOCK          (1<<3)
 
 
 /*
@@ -96,6 +74,8 @@ int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
 #define CEPH_CRYPTO_NONE 0x0
 #define CEPH_CRYPTO_AES  0x1
 
+#define CEPH_AES_IV "cephsageyudagreg"
+
 /* security/authentication protocols */
 #define CEPH_AUTH_UNKNOWN      0x0
 #define CEPH_AUTH_NONE         0x1
@@ -275,6 +255,7 @@ extern const char *ceph_mds_state_name(int s);
 #define CEPH_LOCK_IDFT        512   /* dir frag tree */
 #define CEPH_LOCK_INEST       1024  /* mds internal */
 #define CEPH_LOCK_IXATTR      2048
+#define CEPH_LOCK_IFLOCK      4096  /* advisory file locks */
 #define CEPH_LOCK_INO         8192  /* immutable inode bits; not a lock */
 
 /* client_session ops */
@@ -316,6 +297,8 @@ enum {
        CEPH_MDS_OP_RMXATTR    = 0x01106,
        CEPH_MDS_OP_SETLAYOUT  = 0x01107,
        CEPH_MDS_OP_SETATTR    = 0x01108,
+       CEPH_MDS_OP_SETFILELOCK= 0x01109,
+       CEPH_MDS_OP_GETFILELOCK= 0x00110,
 
        CEPH_MDS_OP_MKNOD      = 0x01201,
        CEPH_MDS_OP_LINK       = 0x01202,
@@ -386,6 +369,15 @@ union ceph_mds_request_args {
        struct {
                struct ceph_file_layout layout;
        } __attribute__ ((packed)) setlayout;
+       struct {
+               __u8 rule; /* currently fcntl or flock */
+               __u8 type; /* shared, exclusive, remove*/
+               __le64 pid; /* process id requesting the lock */
+               __le64 pid_namespace;
+               __le64 start; /* initial location to lock */
+               __le64 length; /* num bytes to lock from start */
+               __u8 wait; /* will caller wait for lock to become available? */
+       } __attribute__ ((packed)) filelock_change;
 } __attribute__ ((packed));
 
 #define CEPH_MDS_FLAG_REPLAY        1  /* this is a replayed op */
@@ -480,6 +472,23 @@ struct ceph_mds_reply_dirfrag {
        __le32 dist[];
 } __attribute__ ((packed));
 
+#define CEPH_LOCK_FCNTL    1
+#define CEPH_LOCK_FLOCK    2
+
+#define CEPH_LOCK_SHARED   1
+#define CEPH_LOCK_EXCL     2
+#define CEPH_LOCK_UNLOCK   4
+
+struct ceph_filelock {
+       __le64 start;/* file offset to start lock at */
+       __le64 length; /* num bytes to lock; 0 for all following start */
+       __le64 client; /* which client holds the lock */
+       __le64 pid; /* process id holding the lock on the client */
+       __le64 pid_namespace;
+       __u8 type; /* shared lock, exclusive lock, or unlock */
+} __attribute__ ((packed));
+
+
 /* file access modes */
 #define CEPH_FILE_MODE_PIN        0
 #define CEPH_FILE_MODE_RD         1
@@ -508,9 +517,10 @@ int ceph_flags_to_mode(int flags);
 #define CEPH_CAP_SAUTH      2
 #define CEPH_CAP_SLINK      4
 #define CEPH_CAP_SXATTR     6
-#define CEPH_CAP_SFILE      8   /* goes at the end (uses >2 cap bits) */
+#define CEPH_CAP_SFILE      8
+#define CEPH_CAP_SFLOCK    20 
 
-#define CEPH_CAP_BITS       16
+#define CEPH_CAP_BITS       22
 
 /* composed values */
 #define CEPH_CAP_AUTH_SHARED  (CEPH_CAP_GSHARED  << CEPH_CAP_SAUTH)
@@ -528,6 +538,9 @@ int ceph_flags_to_mode(int flags);
 #define CEPH_CAP_FILE_BUFFER   (CEPH_CAP_GBUFFER   << CEPH_CAP_SFILE)
 #define CEPH_CAP_FILE_WREXTEND (CEPH_CAP_GWREXTEND << CEPH_CAP_SFILE)
 #define CEPH_CAP_FILE_LAZYIO   (CEPH_CAP_GLAZYIO   << CEPH_CAP_SFILE)
+#define CEPH_CAP_FLOCK_SHARED  (CEPH_CAP_GSHARED   << CEPH_CAP_SFLOCK)
+#define CEPH_CAP_FLOCK_EXCL    (CEPH_CAP_GEXCL     << CEPH_CAP_SFLOCK)
+
 
 /* cap masks (for getattr) */
 #define CEPH_STAT_CAP_INODE    CEPH_CAP_PIN
@@ -563,7 +576,8 @@ int ceph_flags_to_mode(int flags);
                              CEPH_CAP_FILE_EXCL)
 #define CEPH_CAP_ANY_WR   (CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_FILE_WR)
 #define CEPH_CAP_ANY      (CEPH_CAP_ANY_RD | CEPH_CAP_ANY_EXCL | \
-                          CEPH_CAP_ANY_FILE_WR | CEPH_CAP_PIN)
+                          CEPH_CAP_ANY_FILE_WR | CEPH_CAP_FILE_LAZYIO | \
+                          CEPH_CAP_PIN)
 
 #define CEPH_CAP_LOCKS (CEPH_LOCK_IFILE | CEPH_LOCK_IAUTH | CEPH_LOCK_ILINK | \
                        CEPH_LOCK_IXATTR)
@@ -650,6 +664,16 @@ struct ceph_mds_lease {
 
 /* client reconnect */
 struct ceph_mds_cap_reconnect {
+       __le64 cap_id;
+       __le32 wanted;
+       __le32 issued;
+       __le64 snaprealm;
+       __le64 pathbase;        /* base ino for our path to this ino */
+       __le32 flock_len;       /* size of flock state blob, if any */
+} __attribute__ ((packed));
+/* followed by flock blob */
+
+struct ceph_mds_cap_reconnect_v1 {
        __le64 cap_id;
        __le32 wanted;
        __le32 issued;
@@ -658,7 +682,6 @@ struct ceph_mds_cap_reconnect {
        __le64 snaprealm;
        __le64 pathbase;        /* base ino for our path to this ino */
 } __attribute__ ((packed));
-/* followed by encoded string */
 
 struct ceph_mds_snaprealm_reconnect {
        __le64 ino;     /* snap realm base */
index 5ac470c433c94466a34960c7e06d97e2065324e1..d099c3f90236276976586dfbf505ff46141e7ddb 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef _FS_CEPH_HASH_H
-#define _FS_CEPH_HASH_H
+#ifndef FS_CEPH_HASH_H
+#define FS_CEPH_HASH_H
 
 #define CEPH_STR_HASH_LINUX      0x1  /* linux dcache hash */
 #define CEPH_STR_HASH_RJENKINS   0x2  /* robert jenkins' */
index 7503aee828cef9156e3a875f91e9b87e132911b9..c6179d3a26a216cb7b8f6ee0119884162887c778 100644 (file)
@@ -28,6 +28,7 @@ const char *ceph_osd_op_name(int op)
        case CEPH_OSD_OP_TRUNCATE: return "truncate";
        case CEPH_OSD_OP_ZERO: return "zero";
        case CEPH_OSD_OP_WRITEFULL: return "writefull";
+       case CEPH_OSD_OP_ROLLBACK: return "rollback";
 
        case CEPH_OSD_OP_APPEND: return "append";
        case CEPH_OSD_OP_STARTSYNC: return "startsync";
@@ -129,6 +130,8 @@ const char *ceph_mds_op_name(int op)
        case CEPH_MDS_OP_LSSNAP: return "lssnap";
        case CEPH_MDS_OP_MKSNAP: return "mksnap";
        case CEPH_MDS_OP_RMSNAP: return "rmsnap";
+       case CEPH_MDS_OP_SETFILELOCK: return "setfilelock";
+       case CEPH_MDS_OP_GETFILELOCK: return "getfilelock";
        }
        return "???";
 }
index dcd7e752370077cd6cbba5745a0249667e04e73e..97e435b191f411380bbc546530c725093095231c 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef _CRUSH_CRUSH_H
-#define _CRUSH_CRUSH_H
+#ifndef CEPH_CRUSH_CRUSH_H
+#define CEPH_CRUSH_CRUSH_H
 
 #include <linux/types.h>
 
index ff48e110e4bb44232102363a93809eaf4e793178..91e884230d5db99cbfacc478bf8acbe2d4533908 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef _CRUSH_HASH_H
-#define _CRUSH_HASH_H
+#ifndef CEPH_CRUSH_HASH_H
+#define CEPH_CRUSH_HASH_H
 
 #define CRUSH_HASH_RJENKINS1   0
 
index 98e90046fd9f89a07835647ac3f845429ebe90fc..c46b99c18bb0ca772f87c567f4ab46cec54932db 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef _CRUSH_MAPPER_H
-#define _CRUSH_MAPPER_H
+#ifndef CEPH_CRUSH_MAPPER_H
+#define CEPH_CRUSH_MAPPER_H
 
 /*
  * CRUSH functions for find rules and then mapping an input to an
index f704b3b624245b7cb44efa80f8949f3e9d9715f7..a3e627f63293b2d329f620525328280a9ae15291 100644 (file)
@@ -75,10 +75,11 @@ static struct crypto_blkcipher *ceph_crypto_alloc_cipher(void)
        return crypto_alloc_blkcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC);
 }
 
-const u8 *aes_iv = "cephsageyudagreg";
+static const u8 *aes_iv = (u8 *)CEPH_AES_IV;
 
-int ceph_aes_encrypt(const void *key, int key_len, void *dst, size_t *dst_len,
-                    const void *src, size_t src_len)
+static int ceph_aes_encrypt(const void *key, int key_len,
+                           void *dst, size_t *dst_len,
+                           const void *src, size_t src_len)
 {
        struct scatterlist sg_in[2], sg_out[1];
        struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher();
@@ -126,9 +127,10 @@ int ceph_aes_encrypt(const void *key, int key_len, void *dst, size_t *dst_len,
        return 0;
 }
 
-int ceph_aes_encrypt2(const void *key, int key_len, void *dst, size_t *dst_len,
-                     const void *src1, size_t src1_len,
-                     const void *src2, size_t src2_len)
+static int ceph_aes_encrypt2(const void *key, int key_len, void *dst,
+                            size_t *dst_len,
+                            const void *src1, size_t src1_len,
+                            const void *src2, size_t src2_len)
 {
        struct scatterlist sg_in[3], sg_out[1];
        struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher();
@@ -179,8 +181,9 @@ int ceph_aes_encrypt2(const void *key, int key_len, void *dst, size_t *dst_len,
        return 0;
 }
 
-int ceph_aes_decrypt(const void *key, int key_len, void *dst, size_t *dst_len,
-                    const void *src, size_t src_len)
+static int ceph_aes_decrypt(const void *key, int key_len,
+                           void *dst, size_t *dst_len,
+                           const void *src, size_t src_len)
 {
        struct scatterlist sg_in[1], sg_out[2];
        struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher();
@@ -238,10 +241,10 @@ int ceph_aes_decrypt(const void *key, int key_len, void *dst, size_t *dst_len,
        return 0;
 }
 
-int ceph_aes_decrypt2(const void *key, int key_len,
-                     void *dst1, size_t *dst1_len,
-                     void *dst2, size_t *dst2_len,
-                     const void *src, size_t src_len)
+static int ceph_aes_decrypt2(const void *key, int key_len,
+                            void *dst1, size_t *dst1_len,
+                            void *dst2, size_t *dst2_len,
+                            const void *src, size_t src_len)
 {
        struct scatterlist sg_in[1], sg_out[3];
        struct crypto_blkcipher *tfm = ceph_crypto_alloc_cipher();
index 40b502e6bd89ca16a9e1efe084e7cfec6c4719f1..bdf38607323c2da2eb87d2c3ebf7b3d158bd7d0d 100644 (file)
@@ -42,7 +42,7 @@ extern int ceph_encrypt2(struct ceph_crypto_key *secret,
                         const void *src2, size_t src2_len);
 
 /* armor.c */
-extern int ceph_armor(char *dst, const void *src, const void *end);
-extern int ceph_unarmor(void *dst, const char *src, const char *end);
+extern int ceph_armor(char *dst, const char *src, const char *end);
+extern int ceph_unarmor(char *dst, const char *src, const char *end);
 
 #endif
index f2f5332ddbba3ca340b3ea4ba5a75a317d18053f..360c4f22718d5e079c2ce99c2484af9376751a76 100644 (file)
@@ -291,7 +291,7 @@ static int dentry_lru_show(struct seq_file *s, void *ptr)
        return 0;
 }
 
-#define DEFINE_SHOW_FUNC(name)                                                 \
+#define DEFINE_SHOW_FUNC(name)                                         \
 static int name##_open(struct inode *inode, struct file *file)         \
 {                                                                      \
        struct seq_file *sf;                                            \
@@ -361,8 +361,8 @@ int ceph_debugfs_client_init(struct ceph_client *client)
        int ret = 0;
        char name[80];
 
-       snprintf(name, sizeof(name), FSID_FORMAT ".client%lld",
-                PR_FSID(&client->fsid), client->monc.auth->global_id);
+       snprintf(name, sizeof(name), "%pU.client%lld", &client->fsid,
+                client->monc.auth->global_id);
 
        client->debugfs_dir = debugfs_create_dir(name, ceph_debugfs_dir);
        if (!client->debugfs_dir)
@@ -432,11 +432,12 @@ int ceph_debugfs_client_init(struct ceph_client *client)
        if (!client->debugfs_caps)
                goto out;
 
-       client->debugfs_congestion_kb = debugfs_create_file("writeback_congestion_kb",
-                                                  0600,
-                                                  client->debugfs_dir,
-                                                  client,
-                                                  &congestion_kb_fops);
+       client->debugfs_congestion_kb =
+               debugfs_create_file("writeback_congestion_kb",
+                                   0600,
+                                   client->debugfs_dir,
+                                   client,
+                                   &congestion_kb_fops);
        if (!client->debugfs_congestion_kb)
                goto out;
 
@@ -466,7 +467,7 @@ void ceph_debugfs_client_cleanup(struct ceph_client *client)
        debugfs_remove(client->debugfs_dir);
 }
 
-#else  // CONFIG_DEBUG_FS
+#else  /* CONFIG_DEBUG_FS */
 
 int __init ceph_debugfs_init(void)
 {
@@ -486,4 +487,4 @@ void ceph_debugfs_client_cleanup(struct ceph_client *client)
 {
 }
 
-#endif  // CONFIG_DEBUG_FS
+#endif  /* CONFIG_DEBUG_FS */
index 65b3e022eaf5818d6514b770603d0f6552e2b6e6..3d25415afe6368debff12c405463e52c7683edc6 100644 (file)
@@ -99,11 +99,13 @@ static inline void ceph_encode_timespec(struct ceph_timespec *tv,
  */
 static inline void ceph_encode_addr(struct ceph_entity_addr *a)
 {
-       a->in_addr.ss_family = htons(a->in_addr.ss_family);
+       __be16 ss_family = htons(a->in_addr.ss_family);
+       a->in_addr.ss_family = *(__u16 *)&ss_family;
 }
 static inline void ceph_decode_addr(struct ceph_entity_addr *a)
 {
-       a->in_addr.ss_family = ntohs(a->in_addr.ss_family);
+       __be16 ss_family = *(__be16 *)&a->in_addr.ss_family;
+       a->in_addr.ss_family = ntohs(ss_family);
        WARN_ON(a->in_addr.ss_family == 512);
 }
 
index f94ed3c7f6a5a862a3276b4b222231ab4c40e97f..67bbb41d5526fcbdfb74f48f0e261b59b957ef9c 100644 (file)
@@ -27,7 +27,7 @@
 
 const struct inode_operations ceph_dir_iops;
 const struct file_operations ceph_dir_fops;
-struct dentry_operations ceph_dentry_ops;
+const struct dentry_operations ceph_dentry_ops;
 
 /*
  * Initialize ceph dentry state.
@@ -94,6 +94,8 @@ static unsigned fpos_off(loff_t p)
  */
 static int __dcache_readdir(struct file *filp,
                            void *dirent, filldir_t filldir)
+               __releases(inode->i_lock)
+               __acquires(inode->i_lock)
 {
        struct inode *inode = filp->f_dentry->d_inode;
        struct ceph_file_info *fi = filp->private_data;
@@ -1239,16 +1241,16 @@ const struct inode_operations ceph_dir_iops = {
        .create = ceph_create,
 };
 
-struct dentry_operations ceph_dentry_ops = {
+const struct dentry_operations ceph_dentry_ops = {
        .d_revalidate = ceph_d_revalidate,
        .d_release = ceph_dentry_release,
 };
 
-struct dentry_operations ceph_snapdir_dentry_ops = {
+const struct dentry_operations ceph_snapdir_dentry_ops = {
        .d_revalidate = ceph_snapdir_d_revalidate,
        .d_release = ceph_dentry_release,
 };
 
-struct dentry_operations ceph_snap_dentry_ops = {
+const struct dentry_operations ceph_snap_dentry_ops = {
        .d_release = ceph_dentry_release,
 };
index 7c08698fad3ef6a3aa4502f7ff1b830cde33146f..8c044a4f045751c62420e67664705f981efe238a 100644 (file)
@@ -317,7 +317,7 @@ void ceph_release_page_vector(struct page **pages, int num_pages)
 /*
  * allocate a vector new pages
  */
-struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags)
+static struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags)
 {
        struct page **pages;
        int i;
@@ -665,7 +665,7 @@ more:
                 * throw out any page cache pages in this range. this
                 * may block.
                 */
-               truncate_inode_pages_range(inode->i_mapping, pos, 
+               truncate_inode_pages_range(inode->i_mapping, pos,
                                           (pos+len) | (PAGE_CACHE_SIZE-1));
        } else {
                pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
@@ -740,28 +740,32 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
                             unsigned long nr_segs, loff_t pos)
 {
        struct file *filp = iocb->ki_filp;
+       struct ceph_file_info *fi = filp->private_data;
        loff_t *ppos = &iocb->ki_pos;
        size_t len = iov->iov_len;
        struct inode *inode = filp->f_dentry->d_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       void *base = iov->iov_base;
+       void __user *base = iov->iov_base;
        ssize_t ret;
-       int got = 0;
+       int want, got = 0;
        int checkeof = 0, read = 0;
 
        dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
             inode, ceph_vinop(inode), pos, (unsigned)len, inode);
 again:
        __ceph_do_pending_vmtruncate(inode);
-       ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_CACHE,
-                           &got, -1);
+       if (fi->fmode & CEPH_FILE_MODE_LAZY)
+               want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
+       else
+               want = CEPH_CAP_FILE_CACHE;
+       ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
        if (ret < 0)
                goto out;
        dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
             inode, ceph_vinop(inode), pos, (unsigned)len,
             ceph_cap_string(got));
 
-       if ((got & CEPH_CAP_FILE_CACHE) == 0 ||
+       if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
            (iocb->ki_filp->f_flags & O_DIRECT) ||
            (inode->i_sb->s_flags & MS_SYNCHRONOUS))
                /* hmm, this isn't really async... */
@@ -807,11 +811,12 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
                       unsigned long nr_segs, loff_t pos)
 {
        struct file *file = iocb->ki_filp;
+       struct ceph_file_info *fi = file->private_data;
        struct inode *inode = file->f_dentry->d_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->osdc;
        loff_t endoff = pos + iov->iov_len;
-       int got = 0;
+       int want, got = 0;
        int ret, err;
 
        if (ceph_snap(inode) != CEPH_NOSNAP)
@@ -824,8 +829,11 @@ retry_snap:
        dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
             inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
             inode->i_size);
-       ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
-                           &got, endoff);
+       if (fi->fmode & CEPH_FILE_MODE_LAZY)
+               want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
+       else
+               want = CEPH_CAP_FILE_BUFFER;
+       ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
        if (ret < 0)
                goto out;
 
@@ -833,7 +841,7 @@ retry_snap:
             inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
             ceph_cap_string(got));
 
-       if ((got & CEPH_CAP_FILE_BUFFER) == 0 ||
+       if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
            (iocb->ki_filp->f_flags & O_DIRECT) ||
            (inode->i_sb->s_flags & MS_SYNCHRONOUS)) {
                ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
@@ -930,6 +938,8 @@ const struct file_operations ceph_file_fops = {
        .aio_write = ceph_aio_write,
        .mmap = ceph_mmap,
        .fsync = ceph_fsync,
+       .lock = ceph_lock,
+       .flock = ceph_flock,
        .splice_read = generic_file_splice_read,
        .splice_write = generic_file_splice_write,
        .unlocked_ioctl = ceph_ioctl,
index 389f9dbd9949557add69369e6fc4ba68e2c84e69..5d893d31e399b81f57543ede560dc2258f7400da 100644 (file)
@@ -442,8 +442,9 @@ int ceph_fill_file_size(struct inode *inode, int issued,
                         * the file is either opened or mmaped
                         */
                        if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD|
-                                     CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
-                                     CEPH_CAP_FILE_EXCL)) ||
+                                      CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
+                                      CEPH_CAP_FILE_EXCL|
+                                      CEPH_CAP_FILE_LAZYIO)) ||
                            mapping_mapped(inode->i_mapping) ||
                            __ceph_caps_file_wanted(ci)) {
                                ci->i_truncate_pending++;
index d085f07756b42159c7c3b916b123bdeabbfc35b8..76e307d2aba16868c9b1e8136496d929e0cd971f 100644 (file)
@@ -143,6 +143,27 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
        return 0;
 }
 
+static long ceph_ioctl_lazyio(struct file *file)
+{
+       struct ceph_file_info *fi = file->private_data;
+       struct inode *inode = file->f_dentry->d_inode;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+
+       if ((fi->fmode & CEPH_FILE_MODE_LAZY) == 0) {
+               spin_lock(&inode->i_lock);
+               ci->i_nr_by_mode[fi->fmode]--;
+               fi->fmode |= CEPH_FILE_MODE_LAZY;
+               ci->i_nr_by_mode[fi->fmode]++;
+               spin_unlock(&inode->i_lock);
+               dout("ioctl_layzio: file %p marked lazy\n", file);
+
+               ceph_check_caps(ci, 0, NULL);
+       } else {
+               dout("ioctl_layzio: file %p already lazy\n", file);
+       }
+       return 0;
+}
+
 long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
 {
        dout("ioctl file %p cmd %u arg %lu\n", file, cmd, arg);
@@ -155,6 +176,9 @@ long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
 
        case CEPH_IOC_GET_DATALOC:
                return ceph_ioctl_get_dataloc(file, (void __user *)arg);
+
+       case CEPH_IOC_LAZYIO:
+               return ceph_ioctl_lazyio(file);
        }
        return -ENOTTY;
 }
index 25e4f1a9d059dbf74e1dc10489ff93d74bc34924..88451a3b6857d14bc7b21f418bcd82e56e9fdc64 100644 (file)
@@ -37,4 +37,6 @@ struct ceph_ioctl_dataloc {
 #define CEPH_IOC_GET_DATALOC _IOWR(CEPH_IOCTL_MAGIC, 3,        \
                                   struct ceph_ioctl_dataloc)
 
+#define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4)
+
 #endif
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
new file mode 100644 (file)
index 0000000..ae85af0
--- /dev/null
@@ -0,0 +1,256 @@
+#include "ceph_debug.h"
+
+#include <linux/file.h>
+#include <linux/namei.h>
+
+#include "super.h"
+#include "mds_client.h"
+#include "pagelist.h"
+
+/**
+ * Implement fcntl and flock locking functions.
+ */
+static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
+                            u64 pid, u64 pid_ns,
+                            int cmd, u64 start, u64 length, u8 wait)
+{
+       struct inode *inode = file->f_dentry->d_inode;
+       struct ceph_mds_client *mdsc =
+               &ceph_sb_to_client(inode->i_sb)->mdsc;
+       struct ceph_mds_request *req;
+       int err;
+
+       req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS);
+       if (IS_ERR(req))
+               return PTR_ERR(req);
+       req->r_inode = igrab(inode);
+
+       dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, "
+            "length: %llu, wait: %d, type`: %d", (int)lock_type,
+            (int)operation, pid, start, length, wait, cmd);
+
+       req->r_args.filelock_change.rule = lock_type;
+       req->r_args.filelock_change.type = cmd;
+       req->r_args.filelock_change.pid = cpu_to_le64(pid);
+       /* This should be adjusted, but I'm not sure if
+          namespaces actually get id numbers*/
+       req->r_args.filelock_change.pid_namespace =
+               cpu_to_le64((u64)pid_ns);
+       req->r_args.filelock_change.start = cpu_to_le64(start);
+       req->r_args.filelock_change.length = cpu_to_le64(length);
+       req->r_args.filelock_change.wait = wait;
+
+       err = ceph_mdsc_do_request(mdsc, inode, req);
+       ceph_mdsc_put_request(req);
+       dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, "
+            "length: %llu, wait: %d, type`: %d err code %d", (int)lock_type,
+            (int)operation, pid, start, length, wait, cmd, err);
+       return err;
+}
+
+/**
+ * Attempt to set an fcntl lock.
+ * For now, this just goes away to the server. Later it may be more awesome.
+ */
+int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
+{
+       u64 length;
+       u8 lock_cmd;
+       int err;
+       u8 wait = 0;
+       u16 op = CEPH_MDS_OP_SETFILELOCK;
+
+       fl->fl_nspid = get_pid(task_tgid(current));
+       dout("ceph_lock, fl_pid:%d", fl->fl_pid);
+
+       /* set wait bit as appropriate, then make command as Ceph expects it*/
+       if (F_SETLKW == cmd)
+               wait = 1;
+       if (F_GETLK == cmd)
+               op = CEPH_MDS_OP_GETFILELOCK;
+
+       if (F_RDLCK == fl->fl_type)
+               lock_cmd = CEPH_LOCK_SHARED;
+       else if (F_WRLCK == fl->fl_type)
+               lock_cmd = CEPH_LOCK_EXCL;
+       else
+               lock_cmd = CEPH_LOCK_UNLOCK;
+
+       if (LLONG_MAX == fl->fl_end)
+               length = 0;
+       else
+               length = fl->fl_end - fl->fl_start + 1;
+
+       err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
+                               (u64)fl->fl_pid, (u64)fl->fl_nspid,
+                               lock_cmd, fl->fl_start,
+                               length, wait);
+       if (!err) {
+               dout("mds locked, locking locally");
+               err = posix_lock_file(file, fl, NULL);
+               if (err && (CEPH_MDS_OP_SETFILELOCK == op)) {
+                       /* undo! This should only happen if the kernel detects
+                        * local deadlock. */
+                       ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
+                                         (u64)fl->fl_pid, (u64)fl->fl_nspid,
+                                         CEPH_LOCK_UNLOCK, fl->fl_start,
+                                         length, 0);
+                       dout("got %d on posix_lock_file, undid lock", err);
+               }
+       } else {
+               dout("mds returned error code %d", err);
+       }
+       return err;
+}
+
+int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
+{
+       u64 length;
+       u8 lock_cmd;
+       int err;
+       u8 wait = 1;
+
+       fl->fl_nspid = get_pid(task_tgid(current));
+       dout("ceph_flock, fl_pid:%d", fl->fl_pid);
+
+       /* set wait bit, then clear it out of cmd*/
+       if (cmd & LOCK_NB)
+               wait = 0;
+       cmd = cmd & (LOCK_SH | LOCK_EX | LOCK_UN);
+       /* set command sequence that Ceph wants to see:
+          shared lock, exclusive lock, or unlock */
+       if (LOCK_SH == cmd)
+               lock_cmd = CEPH_LOCK_SHARED;
+       else if (LOCK_EX == cmd)
+               lock_cmd = CEPH_LOCK_EXCL;
+       else
+               lock_cmd = CEPH_LOCK_UNLOCK;
+       /* mds requires start and length rather than start and end */
+       if (LLONG_MAX == fl->fl_end)
+               length = 0;
+       else
+               length = fl->fl_end - fl->fl_start + 1;
+
+       err = ceph_lock_message(CEPH_LOCK_FLOCK, CEPH_MDS_OP_SETFILELOCK,
+                               file, (u64)fl->fl_pid, (u64)fl->fl_nspid,
+                               lock_cmd, fl->fl_start,
+                               length, wait);
+       if (!err) {
+               err = flock_lock_file_wait(file, fl);
+               if (err) {
+                       ceph_lock_message(CEPH_LOCK_FLOCK,
+                                         CEPH_MDS_OP_SETFILELOCK,
+                                         file, (u64)fl->fl_pid,
+                                         (u64)fl->fl_nspid,
+                                         CEPH_LOCK_UNLOCK, fl->fl_start,
+                                         length, 0);
+                       dout("got %d on flock_lock_file_wait, undid lock", err);
+               }
+       } else {
+               dout("mds error code %d", err);
+       }
+       return err;
+}
+
+/**
+ * Must be called with BKL already held. Fills in the passed
+ * counter variables, so you can prepare pagelist metadata before calling
+ * ceph_encode_locks.
+ */
+void ceph_count_locks(struct inode *inode, int *fcntl_count, int *flock_count)
+{
+       struct file_lock *lock;
+
+       *fcntl_count = 0;
+       *flock_count = 0;
+
+       for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) {
+               if (lock->fl_flags & FL_POSIX)
+                       ++(*fcntl_count);
+               else if (lock->fl_flags & FL_FLOCK)
+                       ++(*flock_count);
+       }
+       dout("counted %d flock locks and %d fcntl locks",
+            *flock_count, *fcntl_count);
+}
+
+/**
+ * Encode the flock and fcntl locks for the given inode into the pagelist.
+ * Format is: #fcntl locks, sequential fcntl locks, #flock locks,
+ * sequential flock locks.
+ * Must be called with BLK already held, and the lock numbers should have
+ * been gathered under the same lock holding window.
+ */
+int ceph_encode_locks(struct inode *inode, struct ceph_pagelist *pagelist,
+                     int num_fcntl_locks, int num_flock_locks)
+{
+       struct file_lock *lock;
+       struct ceph_filelock cephlock;
+       int err = 0;
+
+       dout("encoding %d flock and %d fcntl locks", num_flock_locks,
+            num_fcntl_locks);
+       err = ceph_pagelist_append(pagelist, &num_fcntl_locks, sizeof(u32));
+       if (err)
+               goto fail;
+       for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) {
+               if (lock->fl_flags & FL_POSIX) {
+                       err = lock_to_ceph_filelock(lock, &cephlock);
+                       if (err)
+                               goto fail;
+                       err = ceph_pagelist_append(pagelist, &cephlock,
+                                          sizeof(struct ceph_filelock));
+               }
+               if (err)
+                       goto fail;
+       }
+
+       err = ceph_pagelist_append(pagelist, &num_flock_locks, sizeof(u32));
+       if (err)
+               goto fail;
+       for (lock = inode->i_flock; lock != NULL; lock = lock->fl_next) {
+               if (lock->fl_flags & FL_FLOCK) {
+                       err = lock_to_ceph_filelock(lock, &cephlock);
+                       if (err)
+                               goto fail;
+                       err = ceph_pagelist_append(pagelist, &cephlock,
+                                          sizeof(struct ceph_filelock));
+               }
+               if (err)
+                       goto fail;
+       }
+fail:
+       return err;
+}
+
+/*
+ * Given a pointer to a lock, convert it to a ceph filelock
+ */
+int lock_to_ceph_filelock(struct file_lock *lock,
+                         struct ceph_filelock *cephlock)
+{
+       int err = 0;
+
+       cephlock->start = cpu_to_le64(lock->fl_start);
+       cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1);
+       cephlock->client = cpu_to_le64(0);
+       cephlock->pid = cpu_to_le64(lock->fl_pid);
+       cephlock->pid_namespace = cpu_to_le64((u64)lock->fl_nspid);
+
+       switch (lock->fl_type) {
+       case F_RDLCK:
+               cephlock->type = CEPH_LOCK_SHARED;
+               break;
+       case F_WRLCK:
+               cephlock->type = CEPH_LOCK_EXCL;
+               break;
+       case F_UNLCK:
+               cephlock->type = CEPH_LOCK_UNLOCK;
+               break;
+       default:
+               dout("Have unknown lock type %d", lock->fl_type);
+               err = -EINVAL;
+       }
+
+       return err;
+}
index dd440bd438a930a5dbd30cfa5e6ea63ae24f5947..a75ddbf9fe3743973c6fd02724f005b8504c3738 100644 (file)
@@ -3,6 +3,7 @@
 #include <linux/wait.h>
 #include <linux/slab.h>
 #include <linux/sched.h>
+#include <linux/smp_lock.h>
 
 #include "mds_client.h"
 #include "mon_client.h"
  * are no longer valid.
  */
 
+struct ceph_reconnect_state {
+       struct ceph_pagelist *pagelist;
+       bool flock;
+};
+
 static void __wake_requests(struct ceph_mds_client *mdsc,
                            struct list_head *head);
 
@@ -449,7 +455,7 @@ void ceph_mdsc_release_request(struct kref *kref)
        kfree(req->r_path1);
        kfree(req->r_path2);
        put_request_session(req);
-       ceph_unreserve_caps(&req->r_caps_reservation);
+       ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
        kfree(req);
 }
 
@@ -512,7 +518,8 @@ static void __register_request(struct ceph_mds_client *mdsc,
 {
        req->r_tid = ++mdsc->last_tid;
        if (req->r_num_caps)
-               ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
+               ceph_reserve_caps(mdsc, &req->r_caps_reservation,
+                                 req->r_num_caps);
        dout("__register_request %p tid %lld\n", req, req->r_tid);
        ceph_mdsc_get_request(req);
        __insert_request(mdsc, req);
@@ -703,6 +710,51 @@ static int __open_session(struct ceph_mds_client *mdsc,
        return 0;
 }
 
+/*
+ * open sessions for any export targets for the given mds
+ *
+ * called under mdsc->mutex
+ */
+static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
+                                         struct ceph_mds_session *session)
+{
+       struct ceph_mds_info *mi;
+       struct ceph_mds_session *ts;
+       int i, mds = session->s_mds;
+       int target;
+
+       if (mds >= mdsc->mdsmap->m_max_mds)
+               return;
+       mi = &mdsc->mdsmap->m_info[mds];
+       dout("open_export_target_sessions for mds%d (%d targets)\n",
+            session->s_mds, mi->num_export_targets);
+
+       for (i = 0; i < mi->num_export_targets; i++) {
+               target = mi->export_targets[i];
+               ts = __ceph_lookup_mds_session(mdsc, target);
+               if (!ts) {
+                       ts = register_session(mdsc, target);
+                       if (IS_ERR(ts))
+                               return;
+               }
+               if (session->s_state == CEPH_MDS_SESSION_NEW ||
+                   session->s_state == CEPH_MDS_SESSION_CLOSING)
+                       __open_session(mdsc, session);
+               else
+                       dout(" mds%d target mds%d %p is %s\n", session->s_mds,
+                            i, ts, session_state_name(ts->s_state));
+               ceph_put_mds_session(ts);
+       }
+}
+
+void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
+                                          struct ceph_mds_session *session)
+{
+       mutex_lock(&mdsc->mutex);
+       __open_export_target_sessions(mdsc, session);
+       mutex_unlock(&mdsc->mutex);
+}
+
 /*
  * session caps
  */
@@ -764,7 +816,7 @@ static int iterate_session_caps(struct ceph_mds_session *session,
                        last_inode = NULL;
                }
                if (old_cap) {
-                       ceph_put_cap(old_cap);
+                       ceph_put_cap(session->s_mdsc, old_cap);
                        old_cap = NULL;
                }
 
@@ -793,7 +845,7 @@ out:
        if (last_inode)
                iput(last_inode);
        if (old_cap)
-               ceph_put_cap(old_cap);
+               ceph_put_cap(session->s_mdsc, old_cap);
 
        return ret;
 }
@@ -1067,15 +1119,16 @@ static int trim_caps(struct ceph_mds_client *mdsc,
  * Called under s_mutex.
  */
 int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
-                         struct ceph_mds_session *session,
-                         int extra)
+                         struct ceph_mds_session *session)
 {
-       struct ceph_msg *msg;
+       struct ceph_msg *msg, *partial = NULL;
        struct ceph_mds_cap_release *head;
        int err = -ENOMEM;
+       int extra = mdsc->client->mount_args->cap_release_safety;
+       int num;
 
-       if (extra < 0)
-               extra = mdsc->client->mount_args->cap_release_safety;
+       dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
+            extra);
 
        spin_lock(&session->s_cap_lock);
 
@@ -1084,9 +1137,14 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
                                       struct ceph_msg,
                                 list_head);
                head = msg->front.iov_base;
-               extra += CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
+               num = le32_to_cpu(head->num);
+               if (num) {
+                       dout(" partial %p with (%d/%d)\n", msg, num,
+                            (int)CEPH_CAPS_PER_RELEASE);
+                       extra += CEPH_CAPS_PER_RELEASE - num;
+                       partial = msg;
+               }
        }
-
        while (session->s_num_cap_releases < session->s_nr_caps + extra) {
                spin_unlock(&session->s_cap_lock);
                msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
@@ -1103,19 +1161,14 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
                session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
        }
 
-       if (!list_empty(&session->s_cap_releases)) {
-               msg = list_first_entry(&session->s_cap_releases,
-                                      struct ceph_msg,
-                                      list_head);
-               head = msg->front.iov_base;
-               if (head->num) {
-                       dout(" queueing non-full %p (%d)\n", msg,
-                            le32_to_cpu(head->num));
-                       list_move_tail(&msg->list_head,
-                                     &session->s_cap_releases_done);
-                       session->s_num_cap_releases -=
-                               CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
-               }
+       if (partial) {
+               head = partial->front.iov_base;
+               num = le32_to_cpu(head->num);
+               dout(" queueing partial %p with %d/%d\n", partial, num,
+                    (int)CEPH_CAPS_PER_RELEASE);
+               list_move_tail(&partial->list_head,
+                              &session->s_cap_releases_done);
+               session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
        }
        err = 0;
        spin_unlock(&session->s_cap_lock);
@@ -1250,6 +1303,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
                return ERR_PTR(-ENOMEM);
 
        mutex_init(&req->r_fill_mutex);
+       req->r_mdsc = mdsc;
        req->r_started = jiffies;
        req->r_resend_mds = -1;
        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
@@ -1580,6 +1634,15 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
 
        req->r_mds = mds;
        req->r_attempts++;
+       if (req->r_inode) {
+               struct ceph_cap *cap =
+                       ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
+
+               if (cap)
+                       req->r_sent_on_mseq = cap->mseq;
+               else
+                       req->r_sent_on_mseq = -1;
+       }
        dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
             req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
 
@@ -1914,21 +1977,40 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        result = le32_to_cpu(head->result);
 
        /*
-        * Tolerate 2 consecutive ESTALEs from the same mds.
-        * FIXME: we should be looking at the cap migrate_seq.
+        * Handle an ESTALE
+        * if we're not talking to the authority, send to them
+        * if the authority has changed while we weren't looking,
+        * send to new authority
+        * Otherwise we just have to return an ESTALE
         */
        if (result == -ESTALE) {
-               req->r_direct_mode = USE_AUTH_MDS;
-               req->r_num_stale++;
-               if (req->r_num_stale <= 2) {
+               dout("got ESTALE on request %llu", req->r_tid);
+               if (!req->r_inode) {
+                       /* do nothing; not an authority problem */
+               } else if (req->r_direct_mode != USE_AUTH_MDS) {
+                       dout("not using auth, setting for that now");
+                       req->r_direct_mode = USE_AUTH_MDS;
                        __do_request(mdsc, req);
                        mutex_unlock(&mdsc->mutex);
                        goto out;
+               } else  {
+                       struct ceph_inode_info *ci = ceph_inode(req->r_inode);
+                       struct ceph_cap *cap =
+                               ceph_get_cap_for_mds(ci, req->r_mds);;
+
+                       dout("already using auth");
+                       if ((!cap || cap != ci->i_auth_cap) ||
+                           (cap->mseq != req->r_sent_on_mseq)) {
+                               dout("but cap changed, so resending");
+                               __do_request(mdsc, req);
+                               mutex_unlock(&mdsc->mutex);
+                               goto out;
+                       }
                }
-       } else {
-               req->r_num_stale = 0;
+               dout("have to return ESTALE on request %llu", req->r_tid);
        }
 
+
        if (head->safe) {
                req->r_got_safe = true;
                __unregister_request(mdsc, req);
@@ -1985,7 +2067,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        if (err == 0) {
                if (result == 0 && rinfo->dir_nr)
                        ceph_readdir_prepopulate(req, req->r_session);
-               ceph_unreserve_caps(&req->r_caps_reservation);
+               ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
        }
        mutex_unlock(&req->r_fill_mutex);
 
@@ -2005,7 +2087,7 @@ out_err:
        }
        mutex_unlock(&mdsc->mutex);
 
-       ceph_add_cap_releases(mdsc, req->r_session, -1);
+       ceph_add_cap_releases(mdsc, req->r_session);
        mutex_unlock(&session->s_mutex);
 
        /* kick calling process */
@@ -2193,9 +2275,14 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
                          void *arg)
 {
-       struct ceph_mds_cap_reconnect rec;
+       union {
+               struct ceph_mds_cap_reconnect v2;
+               struct ceph_mds_cap_reconnect_v1 v1;
+       } rec;
+       size_t reclen;
        struct ceph_inode_info *ci;
-       struct ceph_pagelist *pagelist = arg;
+       struct ceph_reconnect_state *recon_state = arg;
+       struct ceph_pagelist *pagelist = recon_state->pagelist;
        char *path;
        int pathlen, err;
        u64 pathbase;
@@ -2228,17 +2315,44 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
        spin_lock(&inode->i_lock);
        cap->seq = 0;        /* reset cap seq */
        cap->issue_seq = 0;  /* and issue_seq */
-       rec.cap_id = cpu_to_le64(cap->cap_id);
-       rec.pathbase = cpu_to_le64(pathbase);
-       rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
-       rec.issued = cpu_to_le32(cap->issued);
-       rec.size = cpu_to_le64(inode->i_size);
-       ceph_encode_timespec(&rec.mtime, &inode->i_mtime);
-       ceph_encode_timespec(&rec.atime, &inode->i_atime);
-       rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+
+       if (recon_state->flock) {
+               rec.v2.cap_id = cpu_to_le64(cap->cap_id);
+               rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+               rec.v2.issued = cpu_to_le32(cap->issued);
+               rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+               rec.v2.pathbase = cpu_to_le64(pathbase);
+               rec.v2.flock_len = 0;
+               reclen = sizeof(rec.v2);
+       } else {
+               rec.v1.cap_id = cpu_to_le64(cap->cap_id);
+               rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+               rec.v1.issued = cpu_to_le32(cap->issued);
+               rec.v1.size = cpu_to_le64(inode->i_size);
+               ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
+               ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
+               rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+               rec.v1.pathbase = cpu_to_le64(pathbase);
+               reclen = sizeof(rec.v1);
+       }
        spin_unlock(&inode->i_lock);
 
-       err = ceph_pagelist_append(pagelist, &rec, sizeof(rec));
+       if (recon_state->flock) {
+               int num_fcntl_locks, num_flock_locks;
+
+               lock_kernel();
+               ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
+               rec.v2.flock_len = (2*sizeof(u32) +
+                                   (num_fcntl_locks+num_flock_locks) *
+                                   sizeof(struct ceph_filelock));
+
+               err = ceph_pagelist_append(pagelist, &rec, reclen);
+               if (!err)
+                       err = ceph_encode_locks(inode, pagelist,
+                                               num_fcntl_locks,
+                                               num_flock_locks);
+               unlock_kernel();
+       }
 
 out:
        kfree(path);
@@ -2267,6 +2381,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        int mds = session->s_mds;
        int err = -ENOMEM;
        struct ceph_pagelist *pagelist;
+       struct ceph_reconnect_state recon_state;
 
        pr_info("mds%d reconnect start\n", mds);
 
@@ -2301,7 +2416,10 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
        if (err)
                goto fail;
-       err = iterate_session_caps(session, encode_caps_cb, pagelist);
+
+       recon_state.pagelist = pagelist;
+       recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
+       err = iterate_session_caps(session, encode_caps_cb, &recon_state);
        if (err < 0)
                goto fail;
 
@@ -2326,6 +2444,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        }
 
        reply->pagelist = pagelist;
+       if (recon_state.flock)
+               reply->hdr.version = cpu_to_le16(2);
        reply->hdr.data_len = cpu_to_le32(pagelist->length);
        reply->nr_pages = calc_pages_for(0, pagelist->length);
        ceph_con_send(&session->s_con, reply);
@@ -2376,9 +2496,11 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                oldstate = ceph_mdsmap_get_state(oldmap, i);
                newstate = ceph_mdsmap_get_state(newmap, i);
 
-               dout("check_new_map mds%d state %s -> %s (session %s)\n",
+               dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
                     i, ceph_mds_state_name(oldstate),
+                    ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
                     ceph_mds_state_name(newstate),
+                    ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
                     session_state_name(s->s_state));
 
                if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
@@ -2428,6 +2550,21 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                        wake_up_session_caps(s, 1);
                }
        }
+
+       for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
+               s = mdsc->sessions[i];
+               if (!s)
+                       continue;
+               if (!ceph_mdsmap_is_laggy(newmap, i))
+                       continue;
+               if (s->s_state == CEPH_MDS_SESSION_OPEN ||
+                   s->s_state == CEPH_MDS_SESSION_HUNG ||
+                   s->s_state == CEPH_MDS_SESSION_CLOSING) {
+                       dout(" connecting to export targets of laggy mds%d\n",
+                            i);
+                       __open_export_target_sessions(mdsc, s);
+               }
+       }
 }
 
 
@@ -2715,7 +2852,7 @@ static void delayed_work(struct work_struct *work)
                        send_renew_caps(mdsc, s);
                else
                        ceph_con_keepalive(&s->s_con);
-               ceph_add_cap_releases(mdsc, s, -1);
+               ceph_add_cap_releases(mdsc, s);
                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
                    s->s_state == CEPH_MDS_SESSION_HUNG)
                        ceph_send_cap_releases(mdsc, s);
@@ -2764,6 +2901,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
        spin_lock_init(&mdsc->dentry_lru_lock);
        INIT_LIST_HEAD(&mdsc->dentry_lru);
 
+       ceph_caps_init(mdsc);
+       ceph_adjust_min_caps(mdsc, client->min_caps);
+
        return 0;
 }
 
@@ -2959,6 +3099,7 @@ void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
        if (mdsc->mdsmap)
                ceph_mdsmap_destroy(mdsc->mdsmap);
        kfree(mdsc->sessions);
+       ceph_caps_finalize(mdsc);
 }
 
 
index 952410c60d093e7ba5c39ed630c6283664d90c5a..ab7e89f5e344b14d44500866625f378a2c849d57 100644 (file)
@@ -151,6 +151,7 @@ typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc,
 struct ceph_mds_request {
        u64 r_tid;                   /* transaction id */
        struct rb_node r_node;
+       struct ceph_mds_client *r_mdsc;
 
        int r_op;                    /* mds op code */
        int r_mds;
@@ -207,8 +208,8 @@ struct ceph_mds_request {
 
        int               r_attempts;   /* resend attempts */
        int               r_num_fwd;    /* number of forward attempts */
-       int               r_num_stale;
        int               r_resend_mds; /* mds to resend to next, if any*/
+       u32               r_sent_on_mseq; /* cap mseq request was sent at*/
 
        struct kref       r_kref;
        struct list_head  r_wait;
@@ -267,6 +268,27 @@ struct ceph_mds_client {
        spinlock_t        cap_dirty_lock;   /* protects above items */
        wait_queue_head_t cap_flushing_wq;
 
+       /*
+        * Cap reservations
+        *
+        * Maintain a global pool of preallocated struct ceph_caps, referenced
+        * by struct ceph_caps_reservations.  This ensures that we preallocate
+        * memory needed to successfully process an MDS response.  (If an MDS
+        * sends us cap information and we fail to process it, we will have
+        * problems due to the client and MDS being out of sync.)
+        *
+        * Reservations are 'owned' by a ceph_cap_reservation context.
+        */
+       spinlock_t      caps_list_lock;
+       struct          list_head caps_list; /* unused (reserved or
+                                               unreserved) */
+       int             caps_total_count;    /* total caps allocated */
+       int             caps_use_count;      /* in use */
+       int             caps_reserve_count;  /* unused, reserved */
+       int             caps_avail_count;    /* unused, unreserved */
+       int             caps_min_count;      /* keep at least this many
+                                               (unreserved) */
+
 #ifdef CONFIG_DEBUG_FS
        struct dentry     *debugfs_file;
 #endif
@@ -324,8 +346,7 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
 }
 
 extern int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
-                                struct ceph_mds_session *session,
-                                int extra);
+                                struct ceph_mds_session *session);
 extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
                                   struct ceph_mds_session *session);
 
@@ -343,4 +364,7 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
 extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
                                 struct ceph_msg *msg);
 
+extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
+                                         struct ceph_mds_session *session);
+
 #endif
index c4c498e6dfefe50668f0575df203e11b93e6294c..040be6d1150be5ace2be71e955ac3b3525b8fd76 100644 (file)
@@ -85,6 +85,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
                struct ceph_entity_addr addr;
                u32 num_export_targets;
                void *pexport_targets = NULL;
+               struct ceph_timespec laggy_since;
 
                ceph_decode_need(p, end, sizeof(u64)*2 + 1 + sizeof(u32), bad);
                global_id = ceph_decode_64(p);
@@ -103,7 +104,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
                state_seq = ceph_decode_64(p);
                ceph_decode_copy(p, &addr, sizeof(addr));
                ceph_decode_addr(&addr);
-               *p += sizeof(struct ceph_timespec);
+               ceph_decode_copy(p, &laggy_since, sizeof(laggy_since));
                *p += sizeof(u32);
                ceph_decode_32_safe(p, end, namelen, bad);
                *p += namelen;
@@ -122,6 +123,9 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
                        m->m_info[mds].global_id = global_id;
                        m->m_info[mds].state = state;
                        m->m_info[mds].addr = addr;
+                       m->m_info[mds].laggy =
+                               (laggy_since.tv_sec != 0 ||
+                                laggy_since.tv_nsec != 0);
                        m->m_info[mds].num_export_targets = num_export_targets;
                        if (num_export_targets) {
                                m->m_info[mds].export_targets =
index eacc131aa5cb5c7e9c829560ee0c3987f198183e..4c5cb0880bba099d6bf4cdbb51d1f1bd8cbcb01c 100644 (file)
@@ -13,6 +13,7 @@ struct ceph_mds_info {
        struct ceph_entity_addr addr;
        s32 state;
        int num_export_targets;
+       bool laggy;
        u32 *export_targets;
 };
 
@@ -47,6 +48,13 @@ static inline int ceph_mdsmap_get_state(struct ceph_mdsmap *m, int w)
        return m->m_info[w].state;
 }
 
+static inline bool ceph_mdsmap_is_laggy(struct ceph_mdsmap *m, int w)
+{
+       if (w >= 0 && w < m->m_max_mds)
+               return m->m_info[w].laggy;
+       return false;
+}
+
 extern int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m);
 extern struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end);
 extern void ceph_mdsmap_destroy(struct ceph_mdsmap *m);
index 15167b2daa5562c3a093ba03fa14932657718551..2502d76fcec175ddb8500bcd1863c52add1208e5 100644 (file)
@@ -108,7 +108,7 @@ void ceph_msgr_exit(void)
        destroy_workqueue(ceph_msgr_wq);
 }
 
-void ceph_msgr_flush()
+void ceph_msgr_flush(void)
 {
        flush_workqueue(ceph_msgr_wq);
 }
@@ -647,7 +647,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
        dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
             con->connect_seq, global_seq, proto);
 
-       con->out_connect.features = cpu_to_le64(CEPH_FEATURE_SUPPORTED_CLIENT);
+       con->out_connect.features = cpu_to_le64(CEPH_FEATURE_SUPPORTED);
        con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
        con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
        con->out_connect.global_seq = cpu_to_le32(global_seq);
@@ -1081,11 +1081,11 @@ static int process_banner(struct ceph_connection *con)
                   sizeof(con->peer_addr)) != 0 &&
            !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
              con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
-               pr_warning("wrong peer, want %s/%lld, got %s/%lld\n",
+               pr_warning("wrong peer, want %s/%d, got %s/%d\n",
                           pr_addr(&con->peer_addr.in_addr),
-                          le64_to_cpu(con->peer_addr.nonce),
+                          (int)le32_to_cpu(con->peer_addr.nonce),
                           pr_addr(&con->actual_peer_addr.in_addr),
-                          le64_to_cpu(con->actual_peer_addr.nonce));
+                          (int)le32_to_cpu(con->actual_peer_addr.nonce));
                con->error_msg = "wrong peer at address";
                return -1;
        }
@@ -1123,8 +1123,8 @@ static void fail_protocol(struct ceph_connection *con)
 
 static int process_connect(struct ceph_connection *con)
 {
-       u64 sup_feat = CEPH_FEATURE_SUPPORTED_CLIENT;
-       u64 req_feat = CEPH_FEATURE_REQUIRED_CLIENT;
+       u64 sup_feat = CEPH_FEATURE_SUPPORTED;
+       u64 req_feat = CEPH_FEATURE_REQUIRED;
        u64 server_feat = le64_to_cpu(con->in_reply.features);
 
        dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
@@ -1302,8 +1302,8 @@ static void process_ack(struct ceph_connection *con)
 
 
 static int read_partial_message_section(struct ceph_connection *con,
-                                       struct kvec *section, unsigned int sec_len,
-                                       u32 *crc)
+                                       struct kvec *section,
+                                       unsigned int sec_len, u32 *crc)
 {
        int left;
        int ret;
@@ -1434,7 +1434,8 @@ static int read_partial_message(struct ceph_connection *con)
 
        /* middle */
        if (m->middle) {
-               ret = read_partial_message_section(con, &m->middle->vec, middle_len,
+               ret = read_partial_message_section(con, &m->middle->vec,
+                                                  middle_len,
                                                   &con->in_middle_crc);
                if (ret <= 0)
                        return ret;
@@ -1920,7 +1921,7 @@ out:
        /*
         * in case we faulted due to authentication, invalidate our
         * current tickets so that we can get new ones.
-         */
+        */
        if (con->auth_retry && con->ops->invalidate_authorizer) {
                dout("calling invalidate_authorizer()\n");
                con->ops->invalidate_authorizer(con);
index 54fe01c50706a935da2a9611677320bebae8b967..b2a5a3e4a671c336fc7495d473792ddc25f71110 100644 (file)
@@ -349,7 +349,7 @@ out:
 }
 
 /*
- * statfs
+ * generic requests (e.g., statfs, poolop)
  */
 static struct ceph_mon_generic_request *__lookup_generic_req(
        struct ceph_mon_client *monc, u64 tid)
@@ -442,6 +442,35 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
        return m;
 }
 
+static int do_generic_request(struct ceph_mon_client *monc,
+                             struct ceph_mon_generic_request *req)
+{
+       int err;
+
+       /* register request */
+       mutex_lock(&monc->mutex);
+       req->tid = ++monc->last_tid;
+       req->request->hdr.tid = cpu_to_le64(req->tid);
+       __insert_generic_request(monc, req);
+       monc->num_generic_requests++;
+       ceph_con_send(monc->con, ceph_msg_get(req->request));
+       mutex_unlock(&monc->mutex);
+
+       err = wait_for_completion_interruptible(&req->completion);
+
+       mutex_lock(&monc->mutex);
+       rb_erase(&req->node, &monc->generic_request_tree);
+       monc->num_generic_requests--;
+       mutex_unlock(&monc->mutex);
+
+       if (!err)
+               err = req->result;
+       return err;
+}
+
+/*
+ * statfs
+ */
 static void handle_statfs_reply(struct ceph_mon_client *monc,
                                struct ceph_msg *msg)
 {
@@ -468,7 +497,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
        return;
 
 bad:
-       pr_err("corrupt generic reply, no tid\n");
+       pr_err("corrupt generic reply, tid %llu\n", tid);
        ceph_msg_dump(msg);
 }
 
@@ -487,6 +516,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
 
        kref_init(&req->kref);
        req->buf = buf;
+       req->buf_len = sizeof(*buf);
        init_completion(&req->completion);
 
        err = -ENOMEM;
@@ -504,33 +534,134 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
        h->monhdr.session_mon_tid = 0;
        h->fsid = monc->monmap->fsid;
 
-       /* register request */
-       mutex_lock(&monc->mutex);
-       req->tid = ++monc->last_tid;
-       req->request->hdr.tid = cpu_to_le64(req->tid);
-       __insert_generic_request(monc, req);
-       monc->num_generic_requests++;
-       mutex_unlock(&monc->mutex);
+       err = do_generic_request(monc, req);
 
-       /* send request and wait */
-       ceph_con_send(monc->con, ceph_msg_get(req->request));
-       err = wait_for_completion_interruptible(&req->completion);
+out:
+       kref_put(&req->kref, release_generic_request);
+       return err;
+}
+
+/*
+ * pool ops
+ */
+static int get_poolop_reply_buf(const char *src, size_t src_len,
+                               char *dst, size_t dst_len)
+{
+       u32 buf_len;
+
+       if (src_len != sizeof(u32) + dst_len)
+               return -EINVAL;
+
+       buf_len = le32_to_cpu(*(u32 *)src);
+       if (buf_len != dst_len)
+               return -EINVAL;
+
+       memcpy(dst, src + sizeof(u32), dst_len);
+       return 0;
+}
+
+static void handle_poolop_reply(struct ceph_mon_client *monc,
+                               struct ceph_msg *msg)
+{
+       struct ceph_mon_generic_request *req;
+       struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
+       u64 tid = le64_to_cpu(msg->hdr.tid);
+
+       if (msg->front.iov_len < sizeof(*reply))
+               goto bad;
+       dout("handle_poolop_reply %p tid %llu\n", msg, tid);
 
        mutex_lock(&monc->mutex);
-       rb_erase(&req->node, &monc->generic_request_tree);
-       monc->num_generic_requests--;
+       req = __lookup_generic_req(monc, tid);
+       if (req) {
+               if (req->buf_len &&
+                   get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
+                                    msg->front.iov_len - sizeof(*reply),
+                                    req->buf, req->buf_len) < 0) {
+                       mutex_unlock(&monc->mutex);
+                       goto bad;
+               }
+               req->result = le32_to_cpu(reply->reply_code);
+               get_generic_request(req);
+       }
        mutex_unlock(&monc->mutex);
+       if (req) {
+               complete(&req->completion);
+               put_generic_request(req);
+       }
+       return;
 
-       if (!err)
-               err = req->result;
+bad:
+       pr_err("corrupt generic reply, tid %llu\n", tid);
+       ceph_msg_dump(msg);
+}
+
+/*
+ * Do a synchronous pool op.
+ */
+int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
+                       u32 pool, u64 snapid,
+                       char *buf, int len)
+{
+       struct ceph_mon_generic_request *req;
+       struct ceph_mon_poolop *h;
+       int err;
+
+       req = kzalloc(sizeof(*req), GFP_NOFS);
+       if (!req)
+               return -ENOMEM;
+
+       kref_init(&req->kref);
+       req->buf = buf;
+       req->buf_len = len;
+       init_completion(&req->completion);
+
+       err = -ENOMEM;
+       req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS);
+       if (!req->request)
+               goto out;
+       req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS);
+       if (!req->reply)
+               goto out;
+
+       /* fill out request */
+       req->request->hdr.version = cpu_to_le16(2);
+       h = req->request->front.iov_base;
+       h->monhdr.have_version = 0;
+       h->monhdr.session_mon = cpu_to_le16(-1);
+       h->monhdr.session_mon_tid = 0;
+       h->fsid = monc->monmap->fsid;
+       h->pool = cpu_to_le32(pool);
+       h->op = cpu_to_le32(op);
+       h->auid = 0;
+       h->snapid = cpu_to_le64(snapid);
+       h->name_len = 0;
+
+       err = do_generic_request(monc, req);
 
 out:
        kref_put(&req->kref, release_generic_request);
        return err;
 }
 
+int ceph_monc_create_snapid(struct ceph_mon_client *monc,
+                           u32 pool, u64 *snapid)
+{
+       return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
+                                  pool, 0, (char *)snapid, sizeof(*snapid));
+
+}
+
+int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
+                           u32 pool, u64 snapid)
+{
+       return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
+                                  pool, snapid, 0, 0);
+
+}
+
 /*
- * Resend pending statfs requests.
+ * Resend pending generic requests.
  */
 static void __resend_generic_request(struct ceph_mon_client *monc)
 {
@@ -783,6 +914,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
                handle_statfs_reply(monc, msg);
                break;
 
+       case CEPH_MSG_POOLOP_REPLY:
+               handle_poolop_reply(monc, msg);
+               break;
+
        case CEPH_MSG_MON_MAP:
                ceph_monc_handle_map(monc, msg);
                break;
@@ -820,6 +955,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
        case CEPH_MSG_MON_SUBSCRIBE_ACK:
                m = ceph_msg_get(monc->m_subscribe_ack);
                break;
+       case CEPH_MSG_POOLOP_REPLY:
        case CEPH_MSG_STATFS_REPLY:
                return get_generic_reply(con, hdr, skip);
        case CEPH_MSG_AUTH_REPLY:
index 174d794321d0af7cf709e4ff7395fedc381aba81..8e396f2c09637f29d0a82311244d036441a51c1c 100644 (file)
@@ -50,6 +50,7 @@ struct ceph_mon_generic_request {
        struct rb_node node;
        int result;
        void *buf;
+       int buf_len;
        struct completion completion;
        struct ceph_msg *request;  /* original request */
        struct ceph_msg *reply;    /* and reply */
@@ -111,6 +112,10 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc);
 
 extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
 
+extern int ceph_monc_create_snapid(struct ceph_mon_client *monc,
+                                  u32 pool, u64 *snapid);
 
+extern int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
+                                  u32 pool, u64 snapid);
 
 #endif
index 892a0298dfdfddc385873412894193237cdb32c2..680d3d648cac21bd9ed201a6c4bee1651a9df10e 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef __MSGR_H
-#define __MSGR_H
+#ifndef CEPH_MSGR_H
+#define CEPH_MSGR_H
 
 /*
  * Data types for message passing layer used by Ceph.
index e38522347898046fe778df23bdc467c5d4248363..bed6391e52c7109c966d5e642e472c665bd7d1f5 100644 (file)
@@ -1276,8 +1276,6 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
 
        /* it may be a short read due to an object boundary */
        req->r_pages = pages;
-       num_pages = calc_pages_for(off, *plen);
-       req->r_num_pages = num_pages;
 
        dout("readpages  final extent is %llu~%llu (%d pages)\n",
             off, *plen, req->r_num_pages);
@@ -1319,7 +1317,6 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 
        /* it may be a short write due to an object boundary */
        req->r_pages = pages;
-       req->r_num_pages = calc_pages_for(off, len);
        dout("writepages %llu~%llu (%d pages)\n", off, len,
             req->r_num_pages);
 
@@ -1476,8 +1473,8 @@ static void put_osd_con(struct ceph_connection *con)
  * authentication
  */
 static int get_authorizer(struct ceph_connection *con,
-                         void **buf, int *len, int *proto,
-                         void **reply_buf, int *reply_len, int force_new)
+                         void **buf, int *len, int *proto,
+                         void **reply_buf, int *reply_len, int force_new)
 {
        struct ceph_osd *o = con->private;
        struct ceph_osd_client *osdc = o->o_osdc;
@@ -1497,7 +1494,7 @@ static int get_authorizer(struct ceph_connection *con,
                        &o->o_authorizer_reply_buf,
                        &o->o_authorizer_reply_buf_len);
                if (ret)
-               return ret;
+                       return ret;
        }
 
        *proto = ac->protocol;
index 416d46adbf87208544a1813ddfdadbabf4beb511..e31f118f1392988351257d2385863c37228bc7df 100644 (file)
@@ -424,12 +424,30 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi)
        kfree(pi);
 }
 
-void __decode_pool(void **p, struct ceph_pg_pool_info *pi)
+static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
 {
+       unsigned n, m;
+
        ceph_decode_copy(p, &pi->v, sizeof(pi->v));
        calc_pg_masks(pi);
-       *p += le32_to_cpu(pi->v.num_snaps) * sizeof(u64);
+
+       /* num_snaps * snap_info_t */
+       n = le32_to_cpu(pi->v.num_snaps);
+       while (n--) {
+               ceph_decode_need(p, end, sizeof(u64) + 1 + sizeof(u64) +
+                                sizeof(struct ceph_timespec), bad);
+               *p += sizeof(u64) +       /* key */
+                       1 + sizeof(u64) + /* u8, snapid */
+                       sizeof(struct ceph_timespec);
+               m = ceph_decode_32(p);    /* snap name */
+               *p += m;
+       }
+
        *p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2;
+       return 0;
+
+bad:
+       return -EINVAL;
 }
 
 static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
@@ -571,7 +589,9 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
                        kfree(pi);
                        goto bad;
                }
-               __decode_pool(p, pi);
+               err = __decode_pool(p, end, pi);
+               if (err < 0)
+                       goto bad;
                __insert_pg_pool(&map->pg_pools, pi);
        }
 
@@ -760,7 +780,9 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
                        pi->id = pool;
                        __insert_pg_pool(&map->pg_pools, pi);
                }
-               __decode_pool(p, pi);
+               err = __decode_pool(p, end, pi);
+               if (err < 0)
+                       goto bad;
        }
        if (version >= 5 && __decode_pool_names(p, end, map) < 0)
                goto bad;
@@ -833,7 +855,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
                                                node)->pgid, pgid) <= 0) {
                        struct ceph_pg_mapping *cur =
                                rb_entry(rbp, struct ceph_pg_mapping, node);
-                       
+
                        rbp = rb_next(rbp);
                        dout(" removed pg_temp %llx\n", *(u64 *)&cur->pgid);
                        rb_erase(&cur->node, &map->pg_temp);
@@ -1026,8 +1048,9 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
        ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset,
                                 pool->v.type, pool->v.size);
        if (ruleno < 0) {
-               pr_err("no crush rule pool %d type %d size %d\n",
-                      poolid, pool->v.type, pool->v.size);
+               pr_err("no crush rule pool %d ruleset %d type %d size %d\n",
+                      poolid, pool->v.crush_ruleset, pool->v.type,
+                      pool->v.size);
                return NULL;
        }
 
index 8fcc023056c7642787f33b1aeb35c93ce26492b0..6d5247f2e81b273538e78df5db1dc9204674a2d3 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef __RADOS_H
-#define __RADOS_H
+#ifndef CEPH_RADOS_H
+#define CEPH_RADOS_H
 
 /*
  * Data types for the Ceph distributed object storage layer RADOS
@@ -203,6 +203,7 @@ enum {
        CEPH_OSD_OP_TMAPGET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 12,
 
        CEPH_OSD_OP_CREATE  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 13,
+       CEPH_OSD_OP_ROLLBACK= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 14,
 
        /** attrs **/
        /* read */
@@ -272,6 +273,10 @@ static inline int ceph_osd_op_mode_modify(int op)
        return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_WR;
 }
 
+/*
+ * note that the following tmap stuff is also defined in the ceph librados.h
+ * any modification here needs to be updated there
+ */
 #define CEPH_OSD_TMAP_HDR 'h'
 #define CEPH_OSD_TMAP_SET 's'
 #define CEPH_OSD_TMAP_RM  'r'
@@ -297,6 +302,7 @@ enum {
        CEPH_OSD_FLAG_PARALLELEXEC = 512, /* execute op in parallel */
        CEPH_OSD_FLAG_PGOP = 1024,      /* pg op, no object */
        CEPH_OSD_FLAG_EXEC = 2048,      /* op may exec */
+       CEPH_OSD_FLAG_EXEC_PUBLIC = 4096, /* op may exec (public) */
 };
 
 enum {
@@ -350,6 +356,9 @@ struct ceph_osd_op {
                struct {
                        __le64 cookie, count;
                } __attribute__ ((packed)) pgls;
+               struct {
+                       __le64 snapid;
+               } __attribute__ ((packed)) snap;
        };
        __le32 payload_len;
 } __attribute__ ((packed));
index fa87f51e38e112a38f8a95d87ac4c96e02e7d7ef..9922628532b2c649dee6761710405adc552ca248 100644 (file)
@@ -2,6 +2,7 @@
 #include "ceph_debug.h"
 
 #include <linux/backing-dev.h>
+#include <linux/ctype.h>
 #include <linux/fs.h>
 #include <linux/inet.h>
 #include <linux/in6.h>
@@ -101,12 +102,21 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
 }
 
 
-static int ceph_syncfs(struct super_block *sb, int wait)
+static int ceph_sync_fs(struct super_block *sb, int wait)
 {
-       dout("sync_fs %d\n", wait);
+       struct ceph_client *client = ceph_sb_to_client(sb);
+
+       if (!wait) {
+               dout("sync_fs (non-blocking)\n");
+               ceph_flush_dirty_caps(&client->mdsc);
+               dout("sync_fs (non-blocking) done\n");
+               return 0;
+       }
+
+       dout("sync_fs (blocking)\n");
        ceph_osdc_sync(&ceph_sb_to_client(sb)->osdc);
        ceph_mdsc_sync(&ceph_sb_to_client(sb)->mdsc);
-       dout("sync_fs %d done\n", wait);
+       dout("sync_fs (blocking) done\n");
        return 0;
 }
 
@@ -150,9 +160,7 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt)
        struct ceph_mount_args *args = client->mount_args;
 
        if (args->flags & CEPH_OPT_FSID)
-               seq_printf(m, ",fsidmajor=%llu,fsidminor%llu",
-                          le64_to_cpu(*(__le64 *)&args->fsid.fsid[0]),
-                          le64_to_cpu(*(__le64 *)&args->fsid.fsid[8]));
+               seq_printf(m, ",fsid=%pU", &args->fsid);
        if (args->flags & CEPH_OPT_NOSHARE)
                seq_puts(m, ",noshare");
        if (args->flags & CEPH_OPT_DIRSTAT)
@@ -279,7 +287,7 @@ static const struct super_operations ceph_super_ops = {
        .alloc_inode    = ceph_alloc_inode,
        .destroy_inode  = ceph_destroy_inode,
        .write_inode    = ceph_write_inode,
-       .sync_fs        = ceph_syncfs,
+       .sync_fs        = ceph_sync_fs,
        .put_super      = ceph_put_super,
        .show_options   = ceph_show_options,
        .statfs         = ceph_statfs,
@@ -322,9 +330,6 @@ const char *ceph_msg_type_name(int type)
  * mount options
  */
 enum {
-       Opt_fsidmajor,
-       Opt_fsidminor,
-       Opt_monport,
        Opt_wsize,
        Opt_rsize,
        Opt_osdtimeout,
@@ -339,6 +344,7 @@ enum {
        Opt_congestion_kb,
        Opt_last_int,
        /* int args above */
+       Opt_fsid,
        Opt_snapdirname,
        Opt_name,
        Opt_secret,
@@ -355,9 +361,6 @@ enum {
 };
 
 static match_table_t arg_tokens = {
-       {Opt_fsidmajor, "fsidmajor=%ld"},
-       {Opt_fsidminor, "fsidminor=%ld"},
-       {Opt_monport, "monport=%d"},
        {Opt_wsize, "wsize=%d"},
        {Opt_rsize, "rsize=%d"},
        {Opt_osdtimeout, "osdtimeout=%d"},
@@ -371,6 +374,7 @@ static match_table_t arg_tokens = {
        {Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
        {Opt_congestion_kb, "write_congestion_kb=%d"},
        /* int args above */
+       {Opt_fsid, "fsid=%s"},
        {Opt_snapdirname, "snapdirname=%s"},
        {Opt_name, "name=%s"},
        {Opt_secret, "secret=%s"},
@@ -386,6 +390,36 @@ static match_table_t arg_tokens = {
        {-1, NULL}
 };
 
+static int parse_fsid(const char *str, struct ceph_fsid *fsid)
+{
+       int i = 0;
+       char tmp[3];
+       int err = -EINVAL;
+       int d;
+
+       dout("parse_fsid '%s'\n", str);
+       tmp[2] = 0;
+       while (*str && i < 16) {
+               if (ispunct(*str)) {
+                       str++;
+                       continue;
+               }
+               if (!isxdigit(str[0]) || !isxdigit(str[1]))
+                       break;
+               tmp[0] = str[0];
+               tmp[1] = str[1];
+               if (sscanf(tmp, "%x", &d) < 1)
+                       break;
+               fsid->fsid[i] = d & 0xff;
+               i++;
+               str += 2;
+       }
+
+       if (i == 16)
+               err = 0;
+       dout("parse_fsid ret %d got fsid %pU", err, fsid);
+       return err;
+}
 
 static struct ceph_mount_args *parse_mount_args(int flags, char *options,
                                                const char *dev_name,
@@ -469,12 +503,6 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
                        dout("got token %d\n", token);
                }
                switch (token) {
-               case Opt_fsidmajor:
-                       *(__le64 *)&args->fsid.fsid[0] = cpu_to_le64(intval);
-                       break;
-               case Opt_fsidminor:
-                       *(__le64 *)&args->fsid.fsid[8] = cpu_to_le64(intval);
-                       break;
                case Opt_ip:
                        err = ceph_parse_ips(argstr[0].from,
                                             argstr[0].to,
@@ -485,6 +513,11 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
                        args->flags |= CEPH_OPT_MYIP;
                        break;
 
+               case Opt_fsid:
+                       err = parse_fsid(argstr[0].from, &args->fsid);
+                       if (err == 0)
+                               args->flags |= CEPH_OPT_FSID;
+                       break;
                case Opt_snapdirname:
                        kfree(args->snapdir_name);
                        args->snapdir_name = kstrndup(argstr[0].from,
@@ -515,6 +548,9 @@ static struct ceph_mount_args *parse_mount_args(int flags, char *options,
                case Opt_osdkeepalivetimeout:
                        args->osd_keepalive_timeout = intval;
                        break;
+               case Opt_osd_idle_ttl:
+                       args->osd_idle_ttl = intval;
+                       break;
                case Opt_mount_timeout:
                        args->mount_timeout = intval;
                        break;
@@ -630,7 +666,6 @@ static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
 
        /* caps */
        client->min_caps = args->max_readdir;
-       ceph_adjust_min_caps(client->min_caps);
 
        /* subsystems */
        err = ceph_monc_init(&client->monc, client);
@@ -680,8 +715,6 @@ static void ceph_destroy_client(struct ceph_client *client)
 
        ceph_monc_stop(&client->monc);
 
-       ceph_adjust_min_caps(-client->min_caps);
-
        ceph_debugfs_client_cleanup(client);
        destroy_workqueue(client->wb_wq);
        destroy_workqueue(client->pg_inv_wq);
@@ -706,13 +739,13 @@ int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid)
 {
        if (client->have_fsid) {
                if (ceph_fsid_compare(&client->fsid, fsid)) {
-                       pr_err("bad fsid, had " FSID_FORMAT " got " FSID_FORMAT,
-                              PR_FSID(&client->fsid), PR_FSID(fsid));
+                       pr_err("bad fsid, had %pU got %pU",
+                              &client->fsid, fsid);
                        return -1;
                }
        } else {
-               pr_info("client%lld fsid " FSID_FORMAT "\n",
-                       client->monc.auth->global_id, PR_FSID(fsid));
+               pr_info("client%lld fsid %pU\n", client->monc.auth->global_id,
+                       fsid);
                memcpy(&client->fsid, fsid, sizeof(*fsid));
                ceph_debugfs_client_init(client);
                client->have_fsid = true;
@@ -1043,8 +1076,6 @@ static int __init init_ceph(void)
        if (ret)
                goto out_msgr;
 
-       ceph_caps_init();
-
        ret = register_filesystem(&ceph_fs_type);
        if (ret)
                goto out_icache;
@@ -1069,7 +1100,6 @@ static void __exit exit_ceph(void)
 {
        dout("exit_ceph\n");
        unregister_filesystem(&ceph_fs_type);
-       ceph_caps_finalize();
        destroy_caches();
        ceph_msgr_exit();
        ceph_debugfs_cleanup();
index 10a4a406e887506b4d104e8052f9dd50b07fba5f..2482d696f0de17f2fcccdedfc79d2ad4a14c6664 100644 (file)
 #define CEPH_BLOCK_SHIFT   20  /* 1 MB */
 #define CEPH_BLOCK         (1 << CEPH_BLOCK_SHIFT)
 
+/*
+ * Supported features
+ */
+#define CEPH_FEATURE_SUPPORTED CEPH_FEATURE_NOSRCADDR | CEPH_FEATURE_FLOCK
+#define CEPH_FEATURE_REQUIRED  CEPH_FEATURE_NOSRCADDR
+
 /*
  * mount options
  */
@@ -560,11 +566,13 @@ static inline int __ceph_caps_wanted(struct ceph_inode_info *ci)
 /* what the mds thinks we want */
 extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci);
 
-extern void ceph_caps_init(void);
-extern void ceph_caps_finalize(void);
-extern void ceph_adjust_min_caps(int delta);
-extern int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need);
-extern int ceph_unreserve_caps(struct ceph_cap_reservation *ctx);
+extern void ceph_caps_init(struct ceph_mds_client *mdsc);
+extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
+extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
+extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
+                            struct ceph_cap_reservation *ctx, int need);
+extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
+                              struct ceph_cap_reservation *ctx);
 extern void ceph_reservation_status(struct ceph_client *client,
                                    int *total, int *avail, int *used,
                                    int *reserved, int *min);
@@ -738,13 +746,6 @@ extern struct kmem_cache *ceph_file_cachep;
 extern const char *ceph_msg_type_name(int type);
 extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid);
 
-#define FSID_FORMAT "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-" \
-       "%02x%02x%02x%02x%02x%02x"
-#define PR_FSID(f) (f)->fsid[0], (f)->fsid[1], (f)->fsid[2], (f)->fsid[3], \
-               (f)->fsid[4], (f)->fsid[5], (f)->fsid[6], (f)->fsid[7],    \
-               (f)->fsid[8], (f)->fsid[9], (f)->fsid[10], (f)->fsid[11],  \
-               (f)->fsid[12], (f)->fsid[13], (f)->fsid[14], (f)->fsid[15]
-
 /* inode.c */
 extern const struct inode_operations ceph_file_iops;
 
@@ -806,13 +807,16 @@ static inline void ceph_remove_cap(struct ceph_cap *cap)
        __ceph_remove_cap(cap);
        spin_unlock(&inode->i_lock);
 }
-extern void ceph_put_cap(struct ceph_cap *cap);
+extern void ceph_put_cap(struct ceph_mds_client *mdsc,
+                        struct ceph_cap *cap);
 
 extern void ceph_queue_caps_release(struct inode *inode);
 extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
 extern int ceph_fsync(struct file *file, int datasync);
 extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
                                    struct ceph_mds_session *session);
+extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci,
+                                            int mds);
 extern int ceph_get_cap_mds(struct inode *inode);
 extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
 extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
@@ -857,7 +861,7 @@ extern void ceph_release_page_vector(struct page **pages, int num_pages);
 /* dir.c */
 extern const struct file_operations ceph_dir_fops;
 extern const struct inode_operations ceph_dir_iops;
-extern struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
+extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
        ceph_snapdir_dentry_ops;
 
 extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry);
@@ -888,6 +892,14 @@ extern void ceph_debugfs_cleanup(void);
 extern int ceph_debugfs_client_init(struct ceph_client *client);
 extern void ceph_debugfs_client_cleanup(struct ceph_client *client);
 
+/* locks.c */
+extern int ceph_lock(struct file *file, int cmd, struct file_lock *fl);
+extern int ceph_flock(struct file *file, int cmd, struct file_lock *fl);
+extern void ceph_count_locks(struct inode *inode, int *p_num, int *f_num);
+extern int ceph_encode_locks(struct inode *i, struct ceph_pagelist *p,
+                            int p_locks, int f_locks);
+extern int lock_to_ceph_filelock(struct file_lock *fl, struct ceph_filelock *c);
+
 static inline struct inode *get_dentry_parent_inode(struct dentry *dentry)
 {
        if (dentry && dentry->d_parent)
index 68aeebc696817ad537f7cac9997b093b5287a5cd..097a2654c00f517c004ef429b3c0046664f205c3 100644 (file)
@@ -337,6 +337,8 @@ void __ceph_destroy_xattrs(struct ceph_inode_info *ci)
 }
 
 static int __build_xattrs(struct inode *inode)
+       __releases(inode->i_lock)
+       __acquires(inode->i_lock)
 {
        u32 namelen;
        u32 numattr = 0;