ceph: introduce a new inode flag indicating if cached dentries are ordered
authorYan, Zheng <zyan@redhat.com>
Wed, 22 Oct 2014 01:09:56 +0000 (18:09 -0700)
committerIlya Dryomov <idryomov@redhat.com>
Wed, 17 Dec 2014 17:09:50 +0000 (20:09 +0300)
After creating/deleting/renaming file, offsets of sibling dentries may
change. So we can not use cached dentries to satisfy readdir. But we can
still use the cached dentries to conclude -ENOENT for lookup.

This patch introduces a new inode flag indicating if child dentries are
ordered. The flag is set at the same time marking a directory complete.
After creating/deleting/renaming file, we clear the flag on directory
inode. This prevents ceph_readdir() from using cached dentries to satisfy
readdir syscall.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/dir.c
fs/ceph/inode.c
fs/ceph/super.h

index e6d63f8f98c0a79891396313c0b0d1252e1ef0fb..652619950fa92834ecb79580c46bcff7d2aa1a76 100644 (file)
@@ -183,7 +183,7 @@ more:
        spin_unlock(&parent->d_lock);
 
        /* make sure a dentry wasn't dropped while we didn't have parent lock */
-       if (!ceph_dir_is_complete(dir)) {
+       if (!ceph_dir_is_complete_ordered(dir)) {
                dout(" lost dir complete on %p; falling back to mds\n", dir);
                dput(dentry);
                err = -EAGAIN;
@@ -261,10 +261,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
 
        /* always start with . and .. */
        if (ctx->pos == 0) {
-               /* note dir version at start of readdir so we can tell
-                * if any dentries get dropped */
-               fi->dir_release_count = atomic_read(&ci->i_release_count);
-
                dout("readdir off 0 -> '.'\n");
                if (!dir_emit(ctx, ".", 1, 
                            ceph_translate_ino(inode->i_sb, inode->i_ino),
@@ -289,7 +285,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
        if ((ctx->pos == 2 || fi->dentry) &&
            !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
            ceph_snap(inode) != CEPH_SNAPDIR &&
-           __ceph_dir_is_complete(ci) &&
+           __ceph_dir_is_complete_ordered(ci) &&
            __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
                u32 shared_gen = ci->i_shared_gen;
                spin_unlock(&ci->i_ceph_lock);
@@ -312,6 +308,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
 
        /* proceed with a normal readdir */
 
+       if (ctx->pos == 2) {
+               /* note dir version at start of readdir so we can tell
+                * if any dentries get dropped */
+               fi->dir_release_count = atomic_read(&ci->i_release_count);
+               fi->dir_ordered_count = ci->i_ordered_count;
+       }
+
 more:
        /* do we have the correct frag content buffered? */
        if (fi->frag != frag || fi->last_readdir == NULL) {
@@ -446,8 +449,12 @@ more:
         */
        spin_lock(&ci->i_ceph_lock);
        if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
-               dout(" marking %p complete\n", inode);
-               __ceph_dir_set_complete(ci, fi->dir_release_count);
+               if (ci->i_ordered_count == fi->dir_ordered_count)
+                       dout(" marking %p complete and ordered\n", inode);
+               else
+                       dout(" marking %p complete\n", inode);
+               __ceph_dir_set_complete(ci, fi->dir_release_count,
+                                       fi->dir_ordered_count);
        }
        spin_unlock(&ci->i_ceph_lock);
 
index 7b6139004401acfdea7753652dc3f0b3d6165150..72607c17e6fd479d246fcf164db8c948488270ae 100644 (file)
@@ -389,6 +389,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        ci->i_version = 0;
        ci->i_time_warp_seq = 0;
        ci->i_ceph_flags = 0;
+       ci->i_ordered_count = 0;
        atomic_set(&ci->i_release_count, 1);
        atomic_set(&ci->i_complete_count, 0);
        ci->i_symlink = NULL;
@@ -845,7 +846,8 @@ static int fill_inode(struct inode *inode,
            (issued & CEPH_CAP_FILE_EXCL) == 0 &&
            !__ceph_dir_is_complete(ci)) {
                dout(" marking %p complete (empty)\n", inode);
-               __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
+               __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
+                                       ci->i_ordered_count);
        }
 
        /* were we issued a capability? */
@@ -1206,8 +1208,8 @@ retry_lookup:
                        ceph_invalidate_dentry_lease(dn);
 
                        /* d_move screws up sibling dentries' offsets */
-                       ceph_dir_clear_complete(dir);
-                       ceph_dir_clear_complete(olddir);
+                       ceph_dir_clear_ordered(dir);
+                       ceph_dir_clear_ordered(olddir);
 
                        dout("dn %p gets new offset %lld\n", req->r_old_dentry,
                             ceph_dentry(req->r_old_dentry)->offset);
@@ -1219,6 +1221,7 @@ retry_lookup:
                if (!rinfo->head->is_target) {
                        dout("fill_trace null dentry\n");
                        if (dn->d_inode) {
+                               ceph_dir_clear_ordered(dir);
                                dout("d_delete %p\n", dn);
                                d_delete(dn);
                        } else {
@@ -1235,7 +1238,7 @@ retry_lookup:
 
                /* attach proper inode */
                if (!dn->d_inode) {
-                       ceph_dir_clear_complete(dir);
+                       ceph_dir_clear_ordered(dir);
                        ihold(in);
                        dn = splice_dentry(dn, in, &have_lease);
                        if (IS_ERR(dn)) {
@@ -1265,7 +1268,7 @@ retry_lookup:
                BUG_ON(!dir);
                BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
                dout(" linking snapped dir %p to dn %p\n", in, dn);
-               ceph_dir_clear_complete(dir);
+               ceph_dir_clear_ordered(dir);
                ihold(in);
                dn = splice_dentry(dn, in, NULL);
                if (IS_ERR(dn)) {
index b82f507979b82acf064da98ef180db8b65014929..aca22879b41f3769f19ac47065fad8a916eb441f 100644 (file)
@@ -256,6 +256,7 @@ struct ceph_inode_info {
        u32 i_time_warp_seq;
 
        unsigned i_ceph_flags;
+       int i_ordered_count;
        atomic_t i_release_count;
        atomic_t i_complete_count;
 
@@ -434,14 +435,19 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
 /*
  * Ceph inode.
  */
-#define CEPH_I_NODELAY   4  /* do not delay cap release */
-#define CEPH_I_FLUSH     8  /* do not delay flush of dirty metadata */
-#define CEPH_I_NOFLUSH  16  /* do not flush dirty caps */
+#define CEPH_I_DIR_ORDERED     1  /* dentries in dir are ordered */
+#define CEPH_I_NODELAY         4  /* do not delay cap release */
+#define CEPH_I_FLUSH           8  /* do not delay flush of dirty metadata */
+#define CEPH_I_NOFLUSH         16 /* do not flush dirty caps */
 
 static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
-                                          int release_count)
+                                          int release_count, int ordered_count)
 {
        atomic_set(&ci->i_complete_count, release_count);
+       if (ci->i_ordered_count == ordered_count)
+               ci->i_ceph_flags |= CEPH_I_DIR_ORDERED;
+       else
+               ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
 }
 
 static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
@@ -455,16 +461,35 @@ static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci)
                atomic_read(&ci->i_release_count);
 }
 
+static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci)
+{
+       return __ceph_dir_is_complete(ci) &&
+               (ci->i_ceph_flags & CEPH_I_DIR_ORDERED);
+}
+
 static inline void ceph_dir_clear_complete(struct inode *inode)
 {
        __ceph_dir_clear_complete(ceph_inode(inode));
 }
 
-static inline bool ceph_dir_is_complete(struct inode *inode)
+static inline void ceph_dir_clear_ordered(struct inode *inode)
 {
-       return __ceph_dir_is_complete(ceph_inode(inode));
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       spin_lock(&ci->i_ceph_lock);
+       ci->i_ordered_count++;
+       ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
+       spin_unlock(&ci->i_ceph_lock);
 }
 
+static inline bool ceph_dir_is_complete_ordered(struct inode *inode)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       bool ret;
+       spin_lock(&ci->i_ceph_lock);
+       ret = __ceph_dir_is_complete_ordered(ci);
+       spin_unlock(&ci->i_ceph_lock);
+       return ret;
+}
 
 /* find a specific frag @f */
 extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci,
@@ -580,6 +605,7 @@ struct ceph_file_info {
        char *last_name;       /* last entry in previous chunk */
        struct dentry *dentry; /* next dentry (for dcache readdir) */
        int dir_release_count;
+       int dir_ordered_count;
 
        /* used for -o dirstat read() on directory thing */
        char *dir_info;