*
* Spinlocks: Mohamed Abbas (abbas.mohamed@intel.com)
* Lockless receive & send, fd based notify:
- * Manfred Spraul (manfred@colorfullife.com)
+ * Manfred Spraul (manfred@colorfullife.com)
*
* Audit: George Wilson (ltcgcw@us.ibm.com)
*
#define RECV 1
#define STATE_NONE 0
-#define STATE_PENDING 1
-#define STATE_READY 2
+#define STATE_READY 1
struct posix_msg_tree_node {
struct rb_node rb_node;
struct mq_attr attr;
struct sigevent notify;
- struct pid* notify_owner;
+ struct pid *notify_owner;
struct user_namespace *notify_user_ns;
struct user_struct *user; /* user who created, for accounting */
struct sock *notify_sock;
static struct kmem_cache *mqueue_inode_cachep;
-static struct ctl_table_header * mq_sysctl_table;
+static struct ctl_table_header *mq_sysctl_table;
static inline struct mqueue_inode_info *MQUEUE_I(struct inode *inode)
{
static int mqueue_unlink(struct inode *dir, struct dentry *dentry)
{
- struct inode *inode = dentry->d_inode;
+ struct inode *inode = d_inode(dentry);
dir->i_ctime = dir->i_mtime = dir->i_atime = CURRENT_TIME;
dir->i_size -= DIRENT_SIZE;
- drop_nlink(inode);
- dput(dentry);
- return 0;
+ drop_nlink(inode);
+ dput(dentry);
+ return 0;
}
/*
wq_add(info, sr, ewp);
for (;;) {
- set_current_state(TASK_INTERRUPTIBLE);
+ __set_current_state(TASK_INTERRUPTIBLE);
spin_unlock(&info->lock);
time = schedule_hrtimeout_range_clock(timeout, 0,
HRTIMER_MODE_ABS, CLOCK_REALTIME);
- while (ewp->state == STATE_PENDING)
- cpu_relax();
-
if (ewp->state == STATE_READY) {
retval = 0;
goto out;
static inline void set_cookie(struct sk_buff *skb, char code)
{
- ((char*)skb->data)[NOTIFY_COOKIE_LEN-1] = code;
+ ((char *)skb->data)[NOTIFY_COOKIE_LEN-1] = code;
}
/*
if ((oflag & O_ACCMODE) == (O_RDWR | O_WRONLY))
return ERR_PTR(-EINVAL);
acc = oflag2acc[oflag & O_ACCMODE];
- if (inode_permission(path->dentry->d_inode, acc))
+ if (inode_permission(d_inode(path->dentry), acc))
return ERR_PTR(-EACCES);
return dentry_open(path, oflag, current_cred());
}
ro = mnt_want_write(mnt); /* we'll drop it in any case */
error = 0;
- mutex_lock(&root->d_inode->i_mutex);
+ mutex_lock(&d_inode(root)->i_mutex);
path.dentry = lookup_one_len(name->name, root, strlen(name->name));
if (IS_ERR(path.dentry)) {
error = PTR_ERR(path.dentry);
path.mnt = mntget(mnt);
if (oflag & O_CREAT) {
- if (path.dentry->d_inode) { /* entry already exists */
+ if (d_really_is_positive(path.dentry)) { /* entry already exists */
audit_inode(name, path.dentry, 0);
if (oflag & O_EXCL) {
error = -EEXIST;
goto out;
}
audit_inode_parent_hidden(name, root);
- filp = do_create(ipc_ns, root->d_inode,
+ filp = do_create(ipc_ns, d_inode(root),
&path, oflag, mode,
u_attr ? &attr : NULL);
}
} else {
- if (!path.dentry->d_inode) {
+ if (d_really_is_negative(path.dentry)) {
error = -ENOENT;
goto out;
}
put_unused_fd(fd);
fd = error;
}
- mutex_unlock(&root->d_inode->i_mutex);
+ mutex_unlock(&d_inode(root)->i_mutex);
if (!ro)
mnt_drop_write(mnt);
out_putname:
err = mnt_want_write(mnt);
if (err)
goto out_name;
- mutex_lock_nested(&mnt->mnt_root->d_inode->i_mutex, I_MUTEX_PARENT);
+ mutex_lock_nested(&d_inode(mnt->mnt_root)->i_mutex, I_MUTEX_PARENT);
dentry = lookup_one_len(name->name, mnt->mnt_root,
strlen(name->name));
if (IS_ERR(dentry)) {
goto out_unlock;
}
- inode = dentry->d_inode;
+ inode = d_inode(dentry);
if (!inode) {
err = -ENOENT;
} else {
ihold(inode);
- err = vfs_unlink(dentry->d_parent->d_inode, dentry);
+ err = vfs_unlink(d_inode(dentry->d_parent), dentry, NULL);
}
dput(dentry);
out_unlock:
- mutex_unlock(&mnt->mnt_root->d_inode->i_mutex);
+ mutex_unlock(&d_inode(mnt->mnt_root)->i_mutex);
if (inode)
iput(inode);
mnt_drop_write(mnt);
* list of waiting receivers. A sender checks that list before adding the new
* message into the message array. If there is a waiting receiver, then it
* bypasses the message array and directly hands the message over to the
- * receiver.
- * The receiver accepts the message and returns without grabbing the queue
- * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers
- * are necessary. The same algorithm is used for sysv semaphores, see
- * ipc/sem.c for more details.
+ * receiver. The receiver accepts the message and returns without grabbing the
+ * queue spinlock:
+ *
+ * - Set pointer to message.
+ * - Queue the receiver task for later wakeup (without the info->lock).
+ * - Update its state to STATE_READY. Now the receiver can continue.
+ * - Wake up the process after the lock is dropped. Should the process wake up
+ * before this wakeup (due to a timeout or a signal) it will either see
+ * STATE_READY and continue or acquire the lock to check the state again.
*
* The same algorithm is used for senders.
*/
/* pipelined_send() - send a message directly to the task waiting in
* sys_mq_timedreceive() (without inserting message into a queue).
*/
-static inline void pipelined_send(struct mqueue_inode_info *info,
+static inline void pipelined_send(struct wake_q_head *wake_q,
+ struct mqueue_inode_info *info,
struct msg_msg *message,
struct ext_wait_queue *receiver)
{
receiver->msg = message;
list_del(&receiver->list);
- receiver->state = STATE_PENDING;
- wake_up_process(receiver->task);
- smp_wmb();
+ wake_q_add(wake_q, receiver->task);
+ /*
+ * Rely on the implicit cmpxchg barrier from wake_q_add such
+ * that we can ensure that updating receiver->state is the last
+ * write operation: As once set, the receiver can continue,
+ * and if we don't have the reference count from the wake_q,
+ * yet, at that point we can later have a use-after-free
+ * condition and bogus wakeup.
+ */
receiver->state = STATE_READY;
}
/* pipelined_receive() - if there is task waiting in sys_mq_timedsend()
* gets its message and put to the queue (we have one free place for sure). */
-static inline void pipelined_receive(struct mqueue_inode_info *info)
+static inline void pipelined_receive(struct wake_q_head *wake_q,
+ struct mqueue_inode_info *info)
{
struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND);
}
if (msg_insert(sender->msg, info))
return;
+
list_del(&sender->list);
- sender->state = STATE_PENDING;
- wake_up_process(sender->task);
- smp_wmb();
+ wake_q_add(wake_q, sender->task);
sender->state = STATE_READY;
}
struct timespec ts;
struct posix_msg_tree_node *new_leaf = NULL;
int ret = 0;
+ WAKE_Q(wake_q);
if (u_abs_timeout) {
int res = prepare_timeout(u_abs_timeout, &expires, &ts);
goto out_fput;
}
info = MQUEUE_I(inode);
- audit_inode(NULL, f.file->f_path.dentry, 0);
+ audit_file(f.file);
if (unlikely(!(f.file->f_mode & FMODE_WRITE))) {
ret = -EBADF;
} else {
receiver = wq_get_first_waiter(info, RECV);
if (receiver) {
- pipelined_send(info, msg_ptr, receiver);
+ pipelined_send(&wake_q, info, msg_ptr, receiver);
} else {
/* adds message to the queue */
ret = msg_insert(msg_ptr, info);
}
out_unlock:
spin_unlock(&info->lock);
+ wake_up_q(&wake_q);
out_free:
if (ret)
free_msg(msg_ptr);
goto out_fput;
}
info = MQUEUE_I(inode);
- audit_inode(NULL, f.file->f_path.dentry, 0);
+ audit_file(f.file);
if (unlikely(!(f.file->f_mode & FMODE_READ))) {
ret = -EBADF;
msg_ptr = wait.msg;
}
} else {
+ WAKE_Q(wake_q);
+
msg_ptr = msg_get(info);
inode->i_atime = inode->i_mtime = inode->i_ctime =
CURRENT_TIME;
/* There is now free space in queue. */
- pipelined_receive(info);
+ pipelined_receive(&wake_q, info);
spin_unlock(&info->lock);
+ wake_up_q(&wake_q);
ret = 0;
}
if (ret == 0) {
out_fput:
fdput(f);
out:
- if (sock) {
+ if (sock)
netlink_detachskb(sock, nc);
- } else if (nc) {
+ else if (nc)
dev_kfree_skb(nc);
- }
+
return ret;
}
return error;
}
-__initcall(init_mqueue_fs);
+device_initcall(init_mqueue_fs);