* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
ceph: ensure prealloc_blob is in place when removing xattr
rbd: initialize snap_rwsem in rbd_add()
ceph: enable/disable dentry complete flags via mount option
vfs: export symbol d_find_any_alias()
ceph: always initialize the dentry in open_root_dentry()
libceph: remove useless return value for osd_client __send_request()
ceph: avoid iput() while holding spinlock in ceph_dir_fsync
ceph: avoid useless dget/dput in encode_fh
ceph: dereference pointer after checking for NULL
crush: fix force for non-root TAKE
ceph: remove unnecessary d_fsdata conditional checks
ceph: Use kmemdup rather than duplicating its implementation
Fix up conflicts in fs/ceph/super.c (d_alloc_root() failure handling vs
always initialize the dentry in open_root_dentry)
must rely on TCP's error correction to detect data corruption
in the data payload.
- noasyncreaddir
- Disable client's use its local cache to satisfy readdir
- requests. (This does not change correctness; the client uses
- cached metadata only when a lease or capability ensures it is
- valid.)
+ dcache
+ Use the dcache contents to perform negative lookups and
+ readdir when the client has the entire directory contents in
+ its cache. (This does not change correctness; the client uses
+ cached metadata only when a lease or capability ensures it is
+ valid.)
+
+ nodcache
+ Do not use the dcache as above. This avoids a significant amount of
+ complex code, sacrificing performance without affecting correctness,
+ and is useful for tracking down bugs.
+ noasyncreaddir
+ Do not use the dcache as above for readdir.
More Information
================
INIT_LIST_HEAD(&rbd_dev->node);
INIT_LIST_HEAD(&rbd_dev->snaps);
+ init_rwsem(&rbd_dev->header.snap_rwsem);
+
/* generate unique id: find highest unique id, add one */
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
spin_lock(&dentry->d_lock);
di = ceph_dentry(dentry);
- if (di && di->lease_session) {
+ if (di->lease_session) {
s = di->lease_session;
spin_lock(&s->s_cap_lock);
gen = s->s_cap_gen;
struct ceph_dentry_info *di = ceph_dentry(dentry);
dout("d_release %p\n", dentry);
- if (di) {
- ceph_dentry_lru_del(dentry);
- if (di->lease_session)
- ceph_put_mds_session(di->lease_session);
- kmem_cache_free(ceph_dentry_cachep, di);
- dentry->d_fsdata = NULL;
- }
+ ceph_dentry_lru_del(dentry);
+ if (di->lease_session)
+ ceph_put_mds_session(di->lease_session);
+ kmem_cache_free(ceph_dentry_cachep, di);
+ dentry->d_fsdata = NULL;
}
static int ceph_snapdir_d_revalidate(struct dentry *dentry,
*/
void ceph_dir_set_complete(struct inode *inode)
{
- /* not yet implemented */
+ struct dentry *dentry = d_find_any_alias(inode);
+
+ if (dentry && ceph_dentry(dentry) &&
+ ceph_test_mount_opt(ceph_sb_to_client(dentry->d_sb), DCACHE)) {
+ dout(" marking %p (%p) complete\n", inode, dentry);
+ set_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
+ }
+ dput(dentry);
}
void ceph_dir_clear_complete(struct inode *inode)
{
- /* not yet implemented */
+ struct dentry *dentry = d_find_any_alias(inode);
+
+ if (dentry && ceph_dentry(dentry)) {
+ dout(" marking %p (%p) complete\n", inode, dentry);
+ set_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
+ }
+ dput(dentry);
}
bool ceph_dir_test_complete(struct inode *inode)
{
- /* not yet implemented */
+ struct dentry *dentry = d_find_any_alias(inode);
+
+ if (dentry && ceph_dentry(dentry)) {
+ dout(" marking %p (%p) NOT complete\n", inode, dentry);
+ clear_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
+ }
+ dput(dentry);
return false;
}
do {
ceph_mdsc_get_request(req);
spin_unlock(&ci->i_unsafe_lock);
+
dout("dir_fsync %p wait on tid %llu (until %llu)\n",
inode, req->r_tid, last_tid);
if (req->r_timeout) {
} else {
wait_for_completion(&req->r_safe_completion);
}
- spin_lock(&ci->i_unsafe_lock);
ceph_mdsc_put_request(req);
+ spin_lock(&ci->i_unsafe_lock);
if (ret || list_empty(head))
break;
req = list_entry(head->next,
dout("dentry_lru_add %p %p '%.*s'\n", di, dn,
dn->d_name.len, dn->d_name.name);
- if (di) {
- mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
- spin_lock(&mdsc->dentry_lru_lock);
- list_add_tail(&di->lru, &mdsc->dentry_lru);
- mdsc->num_dentry++;
- spin_unlock(&mdsc->dentry_lru_lock);
- }
+ mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
+ spin_lock(&mdsc->dentry_lru_lock);
+ list_add_tail(&di->lru, &mdsc->dentry_lru);
+ mdsc->num_dentry++;
+ spin_unlock(&mdsc->dentry_lru_lock);
}
void ceph_dentry_lru_touch(struct dentry *dn)
dout("dentry_lru_touch %p %p '%.*s' (offset %lld)\n", di, dn,
dn->d_name.len, dn->d_name.name, di->offset);
- if (di) {
- mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
- spin_lock(&mdsc->dentry_lru_lock);
- list_move_tail(&di->lru, &mdsc->dentry_lru);
- spin_unlock(&mdsc->dentry_lru_lock);
- }
+ mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
+ spin_lock(&mdsc->dentry_lru_lock);
+ list_move_tail(&di->lru, &mdsc->dentry_lru);
+ spin_unlock(&mdsc->dentry_lru_lock);
}
void ceph_dentry_lru_del(struct dentry *dn)
dout("dentry_lru_del %p %p '%.*s'\n", di, dn,
dn->d_name.len, dn->d_name.name);
- if (di) {
- mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
- spin_lock(&mdsc->dentry_lru_lock);
- list_del_init(&di->lru);
- mdsc->num_dentry--;
- spin_unlock(&mdsc->dentry_lru_lock);
- }
+ mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
+ spin_lock(&mdsc->dentry_lru_lock);
+ list_del_init(&di->lru);
+ mdsc->num_dentry--;
+ spin_unlock(&mdsc->dentry_lru_lock);
}
/*
return -EINVAL;
spin_lock(&dentry->d_lock);
- parent = dget(dentry->d_parent);
- spin_unlock(&dentry->d_lock);
-
+ parent = dentry->d_parent;
if (*max_len >= connected_handle_length) {
dout("encode_fh %p connectable\n", dentry);
cfh->ino = ceph_ino(dentry->d_inode);
*max_len = handle_length;
type = 255;
}
- dput(parent);
+ spin_unlock(&dentry->d_lock);
return type;
}
{
struct dentry *dir = dn->d_parent;
struct inode *inode = dir->d_inode;
- struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_inode_info *ci;
struct ceph_dentry_info *di;
BUG_ON(!inode);
+ ci = ceph_inode(inode);
di = ceph_dentry(dn);
spin_lock(&ci->i_ceph_lock);
di = ceph_dentry(dentry);
switch (h->action) {
case CEPH_MDS_LEASE_REVOKE:
- if (di && di->lease_session == session) {
+ if (di->lease_session == session) {
if (ceph_seq_cmp(di->lease_seq, seq) > 0)
h->seq = cpu_to_le32(di->lease_seq);
__ceph_mdsc_drop_dentry_lease(dentry);
break;
case CEPH_MDS_LEASE_RENEW:
- if (di && di->lease_session == session &&
+ if (di->lease_session == session &&
di->lease_gen == session->s_cap_gen &&
di->lease_renew_from &&
di->lease_renew_after == 0) {
Opt_rbytes,
Opt_norbytes,
Opt_noasyncreaddir,
+ Opt_dcache,
+ Opt_nodcache,
Opt_ino32,
};
{Opt_rbytes, "rbytes"},
{Opt_norbytes, "norbytes"},
{Opt_noasyncreaddir, "noasyncreaddir"},
+ {Opt_dcache, "dcache"},
+ {Opt_nodcache, "nodcache"},
{Opt_ino32, "ino32"},
{-1, NULL}
};
case Opt_noasyncreaddir:
fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR;
break;
+ case Opt_dcache:
+ fsopt->flags |= CEPH_MOUNT_OPT_DCACHE;
+ break;
+ case Opt_nodcache:
+ fsopt->flags &= ~CEPH_MOUNT_OPT_DCACHE;
+ break;
case Opt_ino32:
fsopt->flags |= CEPH_MOUNT_OPT_INO32;
break;
seq_puts(m, ",norbytes");
if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR)
seq_puts(m, ",noasyncreaddir");
+ if (fsopt->flags & CEPH_MOUNT_OPT_DCACHE)
+ seq_puts(m, ",dcache");
+ else
+ seq_puts(m, ",nodcache");
if (fsopt->wsize)
seq_printf(m, ",wsize=%d", fsopt->wsize);
root = ERR_PTR(-ENOMEM);
goto out;
}
- ceph_init_dentry(root);
} else {
root = d_obtain_alias(inode);
}
+ ceph_init_dentry(root);
dout("open_root_inode success, root dentry is %p\n", root);
} else {
root = ERR_PTR(err);
#define CEPH_MOUNT_OPT_RBYTES (1<<5) /* dir st_bytes = rbytes */
#define CEPH_MOUNT_OPT_NOASYNCREADDIR (1<<7) /* no dcache readdir */
#define CEPH_MOUNT_OPT_INO32 (1<<8) /* 32 bit inos */
+#define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */
#define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES)
struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
int issued;
int err;
+ int required_blob_size;
int dirty;
if (ceph_snap(inode) != CEPH_NOSNAP)
return -EOPNOTSUPP;
}
+ err = -ENOMEM;
spin_lock(&ci->i_ceph_lock);
__build_xattrs(inode);
+retry:
issued = __ceph_caps_issued(ci, NULL);
dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
if (!(issued & CEPH_CAP_XATTR_EXCL))
goto do_sync;
+ required_blob_size = __get_required_blob_size(ci, 0, 0);
+
+ if (!ci->i_xattrs.prealloc_blob ||
+ required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) {
+ struct ceph_buffer *blob;
+
+ spin_unlock(&ci->i_ceph_lock);
+ dout(" preaallocating new blob size=%d\n", required_blob_size);
+ blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
+ if (!blob)
+ goto out;
+ spin_lock(&ci->i_ceph_lock);
+ if (ci->i_xattrs.prealloc_blob)
+ ceph_buffer_put(ci->i_xattrs.prealloc_blob);
+ ci->i_xattrs.prealloc_blob = blob;
+ goto retry;
+ }
+
err = __remove_xattr_by_name(ceph_inode(inode), name);
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
ci->i_xattrs.dirty = true;
do_sync:
spin_unlock(&ci->i_ceph_lock);
err = ceph_send_removexattr(dentry, name);
+out:
return err;
}
return alias;
}
-static struct dentry * d_find_any_alias(struct inode *inode)
+/**
+ * d_find_any_alias - find any alias for a given inode
+ * @inode: inode to find an alias for
+ *
+ * If any aliases exist for the given inode, take and return a
+ * reference for one of them. If no aliases exist, return %NULL.
+ */
+struct dentry *d_find_any_alias(struct inode *inode)
{
struct dentry *de;
spin_unlock(&inode->i_lock);
return de;
}
-
+EXPORT_SYMBOL(d_find_any_alias);
/**
* d_obtain_alias - find or allocate a dentry for a given inode
extern struct dentry * d_alloc_pseudo(struct super_block *, const struct qstr *);
extern struct dentry * d_splice_alias(struct inode *, struct dentry *);
extern struct dentry * d_add_ci(struct dentry *, struct inode *, struct qstr *);
+extern struct dentry *d_find_any_alias(struct inode *inode);
extern struct dentry * d_obtain_alias(struct inode *);
extern void shrink_dcache_sb(struct super_block *);
extern void shrink_dcache_parent(struct dentry *);
switch (rule->steps[step].op) {
case CRUSH_RULE_TAKE:
w[0] = rule->steps[step].arg1;
- if (force_pos >= 0) {
- BUG_ON(force_context[force_pos] != w[0]);
+
+ /* find position in force_context/hierarchy */
+ while (force_pos >= 0 &&
+ force_context[force_pos] != w[0])
force_pos--;
- }
+ /* and move past it */
+ if (force_pos >= 0)
+ force_pos--;
+
wsize = 1;
break;
const struct ceph_crypto_key *src)
{
memcpy(dst, src, sizeof(struct ceph_crypto_key));
- dst->key = kmalloc(src->len, GFP_NOFS);
+ dst->key = kmemdup(src->key, src->len, GFP_NOFS);
if (!dst->key)
return -ENOMEM;
- memcpy(dst->key, src->key, src->len);
return 0;
}
struct ceph_osd_request *req);
static void __unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
-static int __send_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req);
+static void __send_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req);
static int op_needs_trail(int op)
{
/*
* caller should hold map_sem (for read) and request_mutex
*/
-static int __send_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
+static void __send_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
{
struct ceph_osd_request_head *reqhead;
ceph_msg_get(req->r_request); /* send consumes a ref */
ceph_con_send(&req->r_osd->o_con, req->r_request);
req->r_sent = req->r_osd->o_incarnation;
- return 0;
}
/*
dout("send_request %p no up osds in pg\n", req);
ceph_monc_request_next_osdmap(&osdc->client->monc);
} else {
- rc = __send_request(osdc, req);
- if (rc) {
- if (nofail) {
- dout("osdc_start_request failed send, "
- " will retry %lld\n", req->r_tid);
- rc = 0;
- } else {
- __unregister_request(osdc, req);
- }
- }
+ __send_request(osdc, req);
}
+ rc = 0;
}
out_unlock: