Merge tag 'dlm-3.5' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm
authorLinus Torvalds <torvalds@linux-foundation.org>
Wed, 23 May 2012 02:31:38 +0000 (19:31 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Wed, 23 May 2012 02:31:38 +0000 (19:31 -0700)
Pull dlm updates from David Teigland:
 "This set includes some minor fixes and improvements.  The one large
  patch addresses the special "nodir" mode, which has been a long
  neglected proof of concept, but with these fixes seems to be quite
  usable.  It allows the resource master to be assigned statically
  instead of dynamically, which can improve performance if there is
  little locality and most resources are shared."

* tag 'dlm-3.5' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  dlm: NULL dereference on failure in kmem_cache_create()
  gfs2: fix recovery during unmount
  dlm: fixes for nodir mode
  dlm: improve error and debug messages
  dlm: avoid unnecessary search in search_rsb
  dlm: limit rcom debug messages
  dlm: fix waiter recovery
  dlm: prevent connections during shutdown

16 files changed:
fs/dlm/ast.c
fs/dlm/dlm_internal.h
fs/dlm/lock.c
fs/dlm/lock.h
fs/dlm/lockspace.c
fs/dlm/lowcomms.c
fs/dlm/memory.c
fs/dlm/rcom.c
fs/dlm/recover.c
fs/dlm/recoverd.c
fs/dlm/requestqueue.c
fs/gfs2/incore.h
fs/gfs2/lock_dlm.c
fs/gfs2/ops_fstype.c
fs/gfs2/sys.c
include/linux/dlm.h

index 90e5997262ea77389902b8ea9c97e17ac7384802..63dc19c54d5a0dc567210d86b6b3a676934aa191 100644 (file)
@@ -310,6 +310,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
        }
        mutex_unlock(&ls->ls_cb_mutex);
 
-       log_debug(ls, "dlm_callback_resume %d", count);
+       if (count)
+               log_debug(ls, "dlm_callback_resume %d", count);
 }
 
index 3a564d197e99f2e822f16a6730425575c661248a..bc342f7ac3afe38a37dd05e7873282d212f15c0f 100644 (file)
@@ -38,6 +38,7 @@
 #include <linux/miscdevice.h>
 #include <linux/mutex.h>
 #include <linux/idr.h>
+#include <linux/ratelimit.h>
 #include <asm/uaccess.h>
 
 #include <linux/dlm.h>
@@ -74,6 +75,13 @@ do { \
                       (ls)->ls_name , ##args); \
 } while (0)
 
+#define log_limit(ls, fmt, args...) \
+do { \
+       if (dlm_config.ci_log_debug) \
+               printk_ratelimited(KERN_DEBUG "dlm: %s: " fmt "\n", \
+                       (ls)->ls_name , ##args); \
+} while (0)
+
 #define DLM_ASSERT(x, do) \
 { \
   if (!(x)) \
@@ -263,6 +271,8 @@ struct dlm_lkb {
        ktime_t                 lkb_last_cast_time;     /* for debugging */
        ktime_t                 lkb_last_bast_time;     /* for debugging */
 
+       uint64_t                lkb_recover_seq; /* from ls_recover_seq */
+
        char                    *lkb_lvbptr;
        struct dlm_lksb         *lkb_lksb;      /* caller's status block */
        void                    (*lkb_astfn) (void *astparam);
@@ -317,7 +327,7 @@ enum rsb_flags {
        RSB_NEW_MASTER,
        RSB_NEW_MASTER2,
        RSB_RECOVER_CONVERT,
-       RSB_LOCKS_PURGED,
+       RSB_RECOVER_GRANT,
 };
 
 static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
@@ -563,6 +573,7 @@ struct dlm_ls {
        struct mutex            ls_requestqueue_mutex;
        struct dlm_rcom         *ls_recover_buf;
        int                     ls_recover_nodeid; /* for debugging */
+       unsigned int            ls_recover_locks_in; /* for log info */
        uint64_t                ls_rcom_seq;
        spinlock_t              ls_rcom_spin;
        struct list_head        ls_recover_list;
@@ -589,6 +600,7 @@ struct dlm_ls {
 #define LSFL_UEVENT_WAIT       5
 #define LSFL_TIMEWARN          6
 #define LSFL_CB_DELAY          7
+#define LSFL_NODIR             8
 
 /* much of this is just saving user space pointers associated with the
    lock that we pass back to the user lib with an ast */
@@ -636,7 +648,7 @@ static inline int dlm_recovery_stopped(struct dlm_ls *ls)
 
 static inline int dlm_no_directory(struct dlm_ls *ls)
 {
-       return (ls->ls_exflags & DLM_LSFL_NODIR) ? 1 : 0;
+       return test_bit(LSFL_NODIR, &ls->ls_flags);
 }
 
 int dlm_netlink_init(void);
index 4c58d4a3adc4f29c6a2829bc54ac4be122c77ae7..bdafb65a523456f1898dcb466b24c05b5b792fa3 100644 (file)
@@ -160,11 +160,12 @@ static const int __quecvt_compat_matrix[8][8] = {
 
 void dlm_print_lkb(struct dlm_lkb *lkb)
 {
-       printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
-              "     status %d rqmode %d grmode %d wait_type %d\n",
+       printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
+              "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
               lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
               lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
-              lkb->lkb_grmode, lkb->lkb_wait_type);
+              lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
+              (unsigned long long)lkb->lkb_recover_seq);
 }
 
 static void dlm_print_rsb(struct dlm_rsb *r)
@@ -251,8 +252,6 @@ static inline int is_process_copy(struct dlm_lkb *lkb)
 
 static inline int is_master_copy(struct dlm_lkb *lkb)
 {
-       if (lkb->lkb_flags & DLM_IFL_MSTCPY)
-               DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
        return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
 }
 
@@ -479,6 +478,9 @@ static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
                kref_get(&r->res_ref);
                goto out;
        }
+       if (error == -ENOTBLK)
+               goto out;
+
        error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
        if (error)
                goto out;
@@ -586,6 +588,23 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
        return error;
 }
 
+static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
+{
+       struct rb_node *n;
+       struct dlm_rsb *r;
+       int i;
+
+       for (i = 0; i < ls->ls_rsbtbl_size; i++) {
+               spin_lock(&ls->ls_rsbtbl[i].lock);
+               for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+                       r = rb_entry(n, struct dlm_rsb, res_hashnode);
+                       if (r->res_hash == hash)
+                               dlm_dump_rsb(r);
+               }
+               spin_unlock(&ls->ls_rsbtbl[i].lock);
+       }
+}
+
 /* This is only called to add a reference when the code already holds
    a valid reference to the rsb, so there's no need for locking. */
 
@@ -1064,8 +1083,9 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
                goto out_del;
        }
 
-       log_error(ls, "remwait error %x reply %d flags %x no wait_type",
-                 lkb->lkb_id, mstype, lkb->lkb_flags);
+       log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
+                 lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
+                 mstype, lkb->lkb_flags);
        return -1;
 
  out_del:
@@ -1498,13 +1518,13 @@ static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
        }
 
        lkb->lkb_rqmode = DLM_LOCK_IV;
+       lkb->lkb_highbast = 0;
 }
 
 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
        set_lvb_lock(r, lkb);
        _grant_lock(r, lkb);
-       lkb->lkb_highbast = 0;
 }
 
 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -1866,7 +1886,8 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
 /* Returns the highest requested mode of all blocked conversions; sets
    cw if there's a blocked conversion to DLM_LOCK_CW. */
 
-static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
+static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
+                                unsigned int *count)
 {
        struct dlm_lkb *lkb, *s;
        int hi, demoted, quit, grant_restart, demote_restart;
@@ -1885,6 +1906,8 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
                if (can_be_granted(r, lkb, 0, &deadlk)) {
                        grant_lock_pending(r, lkb);
                        grant_restart = 1;
+                       if (count)
+                               (*count)++;
                        continue;
                }
 
@@ -1918,14 +1941,17 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
        return max_t(int, high, hi);
 }
 
-static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
+static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
+                             unsigned int *count)
 {
        struct dlm_lkb *lkb, *s;
 
        list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
-               if (can_be_granted(r, lkb, 0, NULL))
+               if (can_be_granted(r, lkb, 0, NULL)) {
                        grant_lock_pending(r, lkb);
-                else {
+                       if (count)
+                               (*count)++;
+               } else {
                        high = max_t(int, lkb->lkb_rqmode, high);
                        if (lkb->lkb_rqmode == DLM_LOCK_CW)
                                *cw = 1;
@@ -1954,16 +1980,20 @@ static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
        return 0;
 }
 
-static void grant_pending_locks(struct dlm_rsb *r)
+static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
 {
        struct dlm_lkb *lkb, *s;
        int high = DLM_LOCK_IV;
        int cw = 0;
 
-       DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
+       if (!is_master(r)) {
+               log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
+               dlm_dump_rsb(r);
+               return;
+       }
 
-       high = grant_pending_convert(r, high, &cw);
-       high = grant_pending_wait(r, high, &cw);
+       high = grant_pending_convert(r, high, &cw, count);
+       high = grant_pending_wait(r, high, &cw, count);
 
        if (high == DLM_LOCK_IV)
                return;
@@ -2499,7 +2529,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
           before we try again to grant this one. */
 
        if (is_demoted(lkb)) {
-               grant_pending_convert(r, DLM_LOCK_IV, NULL);
+               grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
                if (_can_be_granted(r, lkb, 1)) {
                        grant_lock(r, lkb);
                        queue_cast(r, lkb, 0);
@@ -2527,7 +2557,7 @@ static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
 {
        switch (error) {
        case 0:
-               grant_pending_locks(r);
+               grant_pending_locks(r, NULL);
                /* grant_pending_locks also sends basts */
                break;
        case -EAGAIN:
@@ -2550,7 +2580,7 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
                              int error)
 {
-       grant_pending_locks(r);
+       grant_pending_locks(r, NULL);
 }
 
 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
@@ -2571,7 +2601,7 @@ static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
                              int error)
 {
        if (error)
-               grant_pending_locks(r);
+               grant_pending_locks(r, NULL);
 }
 
 /*
@@ -3372,7 +3402,7 @@ static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
        return error;
 }
 
-static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
@@ -3412,14 +3442,15 @@ static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
                error = 0;
        if (error)
                dlm_put_lkb(lkb);
-       return;
+       return 0;
 
  fail:
        setup_stub_lkb(ls, ms);
        send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+       return error;
 }
 
-static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
@@ -3429,6 +3460,15 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
        if (error)
                goto fail;
 
+       if (lkb->lkb_remid != ms->m_lkid) {
+               log_error(ls, "receive_convert %x remid %x recover_seq %llu "
+                         "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
+                         (unsigned long long)lkb->lkb_recover_seq,
+                         ms->m_header.h_nodeid, ms->m_lkid);
+               error = -ENOENT;
+               goto fail;
+       }
+
        r = lkb->lkb_resource;
 
        hold_rsb(r);
@@ -3456,14 +3496,15 @@ static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
-       return;
+       return 0;
 
  fail:
        setup_stub_lkb(ls, ms);
        send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+       return error;
 }
 
-static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
@@ -3473,6 +3514,14 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
        if (error)
                goto fail;
 
+       if (lkb->lkb_remid != ms->m_lkid) {
+               log_error(ls, "receive_unlock %x remid %x remote %d %x",
+                         lkb->lkb_id, lkb->lkb_remid,
+                         ms->m_header.h_nodeid, ms->m_lkid);
+               error = -ENOENT;
+               goto fail;
+       }
+
        r = lkb->lkb_resource;
 
        hold_rsb(r);
@@ -3497,14 +3546,15 @@ static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
-       return;
+       return 0;
 
  fail:
        setup_stub_lkb(ls, ms);
        send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+       return error;
 }
 
-static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
@@ -3532,25 +3582,23 @@ static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
-       return;
+       return 0;
 
  fail:
        setup_stub_lkb(ls, ms);
        send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
+       return error;
 }
 
-static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
        int error;
 
        error = find_lkb(ls, ms->m_remid, &lkb);
-       if (error) {
-               log_debug(ls, "receive_grant from %d no lkb %x",
-                         ms->m_header.h_nodeid, ms->m_remid);
-               return;
-       }
+       if (error)
+               return error;
 
        r = lkb->lkb_resource;
 
@@ -3570,20 +3618,18 @@ static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
+       return 0;
 }
 
-static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
        int error;
 
        error = find_lkb(ls, ms->m_remid, &lkb);
-       if (error) {
-               log_debug(ls, "receive_bast from %d no lkb %x",
-                         ms->m_header.h_nodeid, ms->m_remid);
-               return;
-       }
+       if (error)
+               return error;
 
        r = lkb->lkb_resource;
 
@@ -3595,10 +3641,12 @@ static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
                goto out;
 
        queue_bast(r, lkb, ms->m_bastmode);
+       lkb->lkb_highbast = ms->m_bastmode;
  out:
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
+       return 0;
 }
 
 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3653,18 +3701,15 @@ static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
        do_purge(ls, ms->m_nodeid, ms->m_pid);
 }
 
-static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        struct dlm_rsb *r;
        int error, mstype, result;
 
        error = find_lkb(ls, ms->m_remid, &lkb);
-       if (error) {
-               log_debug(ls, "receive_request_reply from %d no lkb %x",
-                         ms->m_header.h_nodeid, ms->m_remid);
-               return;
-       }
+       if (error)
+               return error;
 
        r = lkb->lkb_resource;
        hold_rsb(r);
@@ -3676,8 +3721,13 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
        mstype = lkb->lkb_wait_type;
        error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
-       if (error)
+       if (error) {
+               log_error(ls, "receive_request_reply %x remote %d %x result %d",
+                         lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
+                         ms->m_result);
+               dlm_dump_rsb(r);
                goto out;
+       }
 
        /* Optimization: the dir node was also the master, so it took our
           lookup as a request and sent request reply instead of lookup reply */
@@ -3755,6 +3805,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
+       return 0;
 }
 
 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
@@ -3793,8 +3844,11 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
                break;
 
        default:
-               log_error(r->res_ls, "receive_convert_reply %x error %d",
-                         lkb->lkb_id, ms->m_result);
+               log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
+                         lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
+                         ms->m_result);
+               dlm_print_rsb(r);
+               dlm_print_lkb(lkb);
        }
 }
 
@@ -3821,20 +3875,18 @@ static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
        put_rsb(r);
 }
 
-static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        int error;
 
        error = find_lkb(ls, ms->m_remid, &lkb);
-       if (error) {
-               log_debug(ls, "receive_convert_reply from %d no lkb %x",
-                         ms->m_header.h_nodeid, ms->m_remid);
-               return;
-       }
+       if (error)
+               return error;
 
        _receive_convert_reply(lkb, ms);
        dlm_put_lkb(lkb);
+       return 0;
 }
 
 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
@@ -3873,20 +3925,18 @@ static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
        put_rsb(r);
 }
 
-static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        int error;
 
        error = find_lkb(ls, ms->m_remid, &lkb);
-       if (error) {
-               log_debug(ls, "receive_unlock_reply from %d no lkb %x",
-                         ms->m_header.h_nodeid, ms->m_remid);
-               return;
-       }
+       if (error)
+               return error;
 
        _receive_unlock_reply(lkb, ms);
        dlm_put_lkb(lkb);
+       return 0;
 }
 
 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
@@ -3925,20 +3975,18 @@ static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
        put_rsb(r);
 }
 
-static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
+static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
 {
        struct dlm_lkb *lkb;
        int error;
 
        error = find_lkb(ls, ms->m_remid, &lkb);
-       if (error) {
-               log_debug(ls, "receive_cancel_reply from %d no lkb %x",
-                         ms->m_header.h_nodeid, ms->m_remid);
-               return;
-       }
+       if (error)
+               return error;
 
        _receive_cancel_reply(lkb, ms);
        dlm_put_lkb(lkb);
+       return 0;
 }
 
 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
@@ -3949,7 +3997,7 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
 
        error = find_lkb(ls, ms->m_lkid, &lkb);
        if (error) {
-               log_error(ls, "receive_lookup_reply no lkb");
+               log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
                return;
        }
 
@@ -3993,8 +4041,11 @@ static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
        dlm_put_lkb(lkb);
 }
 
-static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
+static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
+                            uint32_t saved_seq)
 {
+       int error = 0, noent = 0;
+
        if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
                log_debug(ls, "ignore non-member message %d from %d %x %x %d",
                          ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
@@ -4007,47 +4058,50 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
        /* messages sent to a master node */
 
        case DLM_MSG_REQUEST:
-               receive_request(ls, ms);
+               error = receive_request(ls, ms);
                break;
 
        case DLM_MSG_CONVERT:
-               receive_convert(ls, ms);
+               error = receive_convert(ls, ms);
                break;
 
        case DLM_MSG_UNLOCK:
-               receive_unlock(ls, ms);
+               error = receive_unlock(ls, ms);
                break;
 
        case DLM_MSG_CANCEL:
-               receive_cancel(ls, ms);
+               noent = 1;
+               error = receive_cancel(ls, ms);
                break;
 
        /* messages sent from a master node (replies to above) */
 
        case DLM_MSG_REQUEST_REPLY:
-               receive_request_reply(ls, ms);
+               error = receive_request_reply(ls, ms);
                break;
 
        case DLM_MSG_CONVERT_REPLY:
-               receive_convert_reply(ls, ms);
+               error = receive_convert_reply(ls, ms);
                break;
 
        case DLM_MSG_UNLOCK_REPLY:
-               receive_unlock_reply(ls, ms);
+               error = receive_unlock_reply(ls, ms);
                break;
 
        case DLM_MSG_CANCEL_REPLY:
-               receive_cancel_reply(ls, ms);
+               error = receive_cancel_reply(ls, ms);
                break;
 
        /* messages sent from a master node (only two types of async msg) */
 
        case DLM_MSG_GRANT:
-               receive_grant(ls, ms);
+               noent = 1;
+               error = receive_grant(ls, ms);
                break;
 
        case DLM_MSG_BAST:
-               receive_bast(ls, ms);
+               noent = 1;
+               error = receive_bast(ls, ms);
                break;
 
        /* messages sent to a dir node */
@@ -4075,6 +4129,37 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
        default:
                log_error(ls, "unknown message type %d", ms->m_type);
        }
+
+       /*
+        * When checking for ENOENT, we're checking the result of
+        * find_lkb(m_remid):
+        *
+        * The lock id referenced in the message wasn't found.  This may
+        * happen in normal usage for the async messages and cancel, so
+        * only use log_debug for them.
+        *
+        * Some errors are expected and normal.
+        */
+
+       if (error == -ENOENT && noent) {
+               log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
+                         ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
+                         ms->m_lkid, saved_seq);
+       } else if (error == -ENOENT) {
+               log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
+                         ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
+                         ms->m_lkid, saved_seq);
+
+               if (ms->m_type == DLM_MSG_CONVERT)
+                       dlm_dump_rsb_hash(ls, ms->m_hash);
+       }
+
+       if (error == -EINVAL) {
+               log_error(ls, "receive %d inval from %d lkid %x remid %x "
+                         "saved_seq %u",
+                         ms->m_type, ms->m_header.h_nodeid,
+                         ms->m_lkid, ms->m_remid, saved_seq);
+       }
 }
 
 /* If the lockspace is in recovery mode (locking stopped), then normal
@@ -4092,16 +4177,17 @@ static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
                dlm_add_requestqueue(ls, nodeid, ms);
        } else {
                dlm_wait_requestqueue(ls);
-               _receive_message(ls, ms);
+               _receive_message(ls, ms, 0);
        }
 }
 
 /* This is called by dlm_recoverd to process messages that were saved on
    the requestqueue. */
 
-void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
+void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
+                              uint32_t saved_seq)
 {
-       _receive_message(ls, ms);
+       _receive_message(ls, ms, saved_seq);
 }
 
 /* This is called by the midcomms layer when something is received for
@@ -4137,9 +4223,11 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
 
        ls = dlm_find_lockspace_global(hd->h_lockspace);
        if (!ls) {
-               if (dlm_config.ci_log_debug)
-                       log_print("invalid lockspace %x from %d cmd %d type %d",
-                                 hd->h_lockspace, nodeid, hd->h_cmd, type);
+               if (dlm_config.ci_log_debug) {
+                       printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
+                               "%u from %d cmd %d type %d\n",
+                               hd->h_lockspace, nodeid, hd->h_cmd, type);
+               }
 
                if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
                        dlm_send_ls_not_ready(nodeid, &p->rcom);
@@ -4187,15 +4275,13 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
 /* A waiting lkb needs recovery if the master node has failed, or
    the master node is changing (only when no directory is used) */
 
-static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
+static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
+                                int dir_nodeid)
 {
-       if (dlm_is_removed(ls, lkb->lkb_nodeid))
+       if (dlm_no_directory(ls))
                return 1;
 
-       if (!dlm_no_directory(ls))
-               return 0;
-
-       if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
+       if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
                return 1;
 
        return 0;
@@ -4212,6 +4298,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
        struct dlm_lkb *lkb, *safe;
        struct dlm_message *ms_stub;
        int wait_type, stub_unlock_result, stub_cancel_result;
+       int dir_nodeid;
 
        ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
        if (!ms_stub) {
@@ -4223,13 +4310,21 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 
        list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
 
+               dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
+
                /* exclude debug messages about unlocks because there can be so
                   many and they aren't very interesting */
 
                if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
-                       log_debug(ls, "recover_waiter %x nodeid %d "
-                                 "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
-                                 lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
+                       log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
+                                 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
+                                 lkb->lkb_id,
+                                 lkb->lkb_remid,
+                                 lkb->lkb_wait_type,
+                                 lkb->lkb_resource->res_nodeid,
+                                 lkb->lkb_nodeid,
+                                 lkb->lkb_wait_nodeid,
+                                 dir_nodeid);
                }
 
                /* all outstanding lookups, regardless of destination  will be
@@ -4240,7 +4335,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
                        continue;
                }
 
-               if (!waiter_needs_recovery(ls, lkb))
+               if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
                        continue;
 
                wait_type = lkb->lkb_wait_type;
@@ -4373,8 +4468,11 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
                ou = is_overlap_unlock(lkb);
                err = 0;
 
-               log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
-                         lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
+               log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
+                         "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
+                         "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
+                         r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
+                         dlm_dir_nodeid(r), oc, ou);
 
                /* At this point we assume that we won't get a reply to any
                   previous op or overlap op on this lock.  First, do a big
@@ -4426,9 +4524,12 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
                        }
                }
 
-               if (err)
-                       log_error(ls, "recover_waiters_post %x %d %x %d %d",
-                                 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
+               if (err) {
+                       log_error(ls, "waiter %x msg %d r_nodeid %d "
+                                 "dir_nodeid %d overlap %d %d",
+                                 lkb->lkb_id, mstype, r->res_nodeid,
+                                 dlm_dir_nodeid(r), oc, ou);
+               }
                unlock_rsb(r);
                put_rsb(r);
                dlm_put_lkb(lkb);
@@ -4437,112 +4538,177 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
        return error;
 }
 
-static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
-                       int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
+static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
+                             struct list_head *list)
 {
-       struct dlm_ls *ls = r->res_ls;
        struct dlm_lkb *lkb, *safe;
 
-       list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
-               if (test(ls, lkb)) {
-                       rsb_set_flag(r, RSB_LOCKS_PURGED);
-                       del_lkb(r, lkb);
-                       /* this put should free the lkb */
-                       if (!dlm_put_lkb(lkb))
-                               log_error(ls, "purged lkb not released");
-               }
+       list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
+               if (!is_master_copy(lkb))
+                       continue;
+
+               /* don't purge lkbs we've added in recover_master_copy for
+                  the current recovery seq */
+
+               if (lkb->lkb_recover_seq == ls->ls_recover_seq)
+                       continue;
+
+               del_lkb(r, lkb);
+
+               /* this put should free the lkb */
+               if (!dlm_put_lkb(lkb))
+                       log_error(ls, "purged mstcpy lkb not released");
        }
 }
 
-static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
+void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
 {
-       return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
-}
+       struct dlm_ls *ls = r->res_ls;
 
-static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
-{
-       return is_master_copy(lkb);
+       purge_mstcpy_list(ls, r, &r->res_grantqueue);
+       purge_mstcpy_list(ls, r, &r->res_convertqueue);
+       purge_mstcpy_list(ls, r, &r->res_waitqueue);
 }
 
-static void purge_dead_locks(struct dlm_rsb *r)
+static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
+                           struct list_head *list,
+                           int nodeid_gone, unsigned int *count)
 {
-       purge_queue(r, &r->res_grantqueue, &purge_dead_test);
-       purge_queue(r, &r->res_convertqueue, &purge_dead_test);
-       purge_queue(r, &r->res_waitqueue, &purge_dead_test);
-}
+       struct dlm_lkb *lkb, *safe;
 
-void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
-{
-       purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
-       purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
-       purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
+       list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
+               if (!is_master_copy(lkb))
+                       continue;
+
+               if ((lkb->lkb_nodeid == nodeid_gone) ||
+                   dlm_is_removed(ls, lkb->lkb_nodeid)) {
+
+                       del_lkb(r, lkb);
+
+                       /* this put should free the lkb */
+                       if (!dlm_put_lkb(lkb))
+                               log_error(ls, "purged dead lkb not released");
+
+                       rsb_set_flag(r, RSB_RECOVER_GRANT);
+
+                       (*count)++;
+               }
+       }
 }
 
 /* Get rid of locks held by nodes that are gone. */
 
-int dlm_purge_locks(struct dlm_ls *ls)
+void dlm_recover_purge(struct dlm_ls *ls)
 {
        struct dlm_rsb *r;
+       struct dlm_member *memb;
+       int nodes_count = 0;
+       int nodeid_gone = 0;
+       unsigned int lkb_count = 0;
 
-       log_debug(ls, "dlm_purge_locks");
+       /* cache one removed nodeid to optimize the common
+          case of a single node removed */
+
+       list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
+               nodes_count++;
+               nodeid_gone = memb->nodeid;
+       }
+
+       if (!nodes_count)
+               return;
 
        down_write(&ls->ls_root_sem);
        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
                hold_rsb(r);
                lock_rsb(r);
-               if (is_master(r))
-                       purge_dead_locks(r);
+               if (is_master(r)) {
+                       purge_dead_list(ls, r, &r->res_grantqueue,
+                                       nodeid_gone, &lkb_count);
+                       purge_dead_list(ls, r, &r->res_convertqueue,
+                                       nodeid_gone, &lkb_count);
+                       purge_dead_list(ls, r, &r->res_waitqueue,
+                                       nodeid_gone, &lkb_count);
+               }
                unlock_rsb(r);
                unhold_rsb(r);
-
-               schedule();
+               cond_resched();
        }
        up_write(&ls->ls_root_sem);
 
-       return 0;
+       if (lkb_count)
+               log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
+                         lkb_count, nodes_count);
 }
 
-static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
+static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
 {
        struct rb_node *n;
-       struct dlm_rsb *r, *r_ret = NULL;
+       struct dlm_rsb *r;
 
        spin_lock(&ls->ls_rsbtbl[bucket].lock);
        for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
                r = rb_entry(n, struct dlm_rsb, res_hashnode);
-               if (!rsb_flag(r, RSB_LOCKS_PURGED))
+
+               if (!rsb_flag(r, RSB_RECOVER_GRANT))
+                       continue;
+               rsb_clear_flag(r, RSB_RECOVER_GRANT);
+               if (!is_master(r))
                        continue;
                hold_rsb(r);
-               rsb_clear_flag(r, RSB_LOCKS_PURGED);
-               r_ret = r;
-               break;
+               spin_unlock(&ls->ls_rsbtbl[bucket].lock);
+               return r;
        }
        spin_unlock(&ls->ls_rsbtbl[bucket].lock);
-       return r_ret;
+       return NULL;
 }
 
-void dlm_grant_after_purge(struct dlm_ls *ls)
+/*
+ * Attempt to grant locks on resources that we are the master of.
+ * Locks may have become grantable during recovery because locks
+ * from departed nodes have been purged (or not rebuilt), allowing
+ * previously blocked locks to now be granted.  The subset of rsb's
+ * we are interested in are those with lkb's on either the convert or
+ * waiting queues.
+ *
+ * Simplest would be to go through each master rsb and check for non-empty
+ * convert or waiting queues, and attempt to grant on those rsbs.
+ * Checking the queues requires lock_rsb, though, for which we'd need
+ * to release the rsbtbl lock.  This would make iterating through all
+ * rsb's very inefficient.  So, we rely on earlier recovery routines
+ * to set RECOVER_GRANT on any rsb's that we should attempt to grant
+ * locks for.
+ */
+
+void dlm_recover_grant(struct dlm_ls *ls)
 {
        struct dlm_rsb *r;
        int bucket = 0;
+       unsigned int count = 0;
+       unsigned int rsb_count = 0;
+       unsigned int lkb_count = 0;
 
        while (1) {
-               r = find_purged_rsb(ls, bucket);
+               r = find_grant_rsb(ls, bucket);
                if (!r) {
                        if (bucket == ls->ls_rsbtbl_size - 1)
                                break;
                        bucket++;
                        continue;
                }
+               rsb_count++;
+               count = 0;
                lock_rsb(r);
-               if (is_master(r)) {
-                       grant_pending_locks(r);
-                       confirm_master(r, 0);
-               }
+               grant_pending_locks(r, &count);
+               lkb_count += count;
+               confirm_master(r, 0);
                unlock_rsb(r);
                put_rsb(r);
-               schedule();
+               cond_resched();
        }
+
+       if (lkb_count)
+               log_debug(ls, "dlm_recover_grant %u locks on %u resources",
+                         lkb_count, rsb_count);
 }
 
 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
@@ -4631,6 +4797,7 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
        struct dlm_rsb *r;
        struct dlm_lkb *lkb;
+       uint32_t remid = 0;
        int error;
 
        if (rl->rl_parent_lkid) {
@@ -4638,14 +4805,31 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
                goto out;
        }
 
-       error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
-                        R_MASTER, &r);
+       remid = le32_to_cpu(rl->rl_lkid);
+
+       /* In general we expect the rsb returned to be R_MASTER, but we don't
+          have to require it.  Recovery of masters on one node can overlap
+          recovery of locks on another node, so one node can send us MSTCPY
+          locks before we've made ourselves master of this rsb.  We can still
+          add new MSTCPY locks that we receive here without any harm; when
+          we make ourselves master, dlm_recover_masters() won't touch the
+          MSTCPY locks we've received early. */
+
+       error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
        if (error)
                goto out;
 
+       if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
+               log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
+                         rc->rc_header.h_nodeid, remid);
+               error = -EBADR;
+               put_rsb(r);
+               goto out;
+       }
+
        lock_rsb(r);
 
-       lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
+       lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
        if (lkb) {
                error = -EEXIST;
                goto out_remid;
@@ -4664,19 +4848,25 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
        attach_lkb(r, lkb);
        add_lkb(r, lkb, rl->rl_status);
        error = 0;
+       ls->ls_recover_locks_in++;
+
+       if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
+               rsb_set_flag(r, RSB_RECOVER_GRANT);
 
  out_remid:
        /* this is the new value returned to the lock holder for
           saving in its process-copy lkb */
        rl->rl_remid = cpu_to_le32(lkb->lkb_id);
 
+       lkb->lkb_recover_seq = ls->ls_recover_seq;
+
  out_unlock:
        unlock_rsb(r);
        put_rsb(r);
  out:
-       if (error)
-               log_debug(ls, "recover_master_copy %d %x", error,
-                         le32_to_cpu(rl->rl_lkid));
+       if (error && error != -EEXIST)
+               log_debug(ls, "dlm_recover_master_copy remote %d %x error %d",
+                         rc->rc_header.h_nodeid, remid, error);
        rl->rl_result = cpu_to_le32(error);
        return error;
 }
@@ -4687,41 +4877,52 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
        struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
        struct dlm_rsb *r;
        struct dlm_lkb *lkb;
-       int error;
+       uint32_t lkid, remid;
+       int error, result;
+
+       lkid = le32_to_cpu(rl->rl_lkid);
+       remid = le32_to_cpu(rl->rl_remid);
+       result = le32_to_cpu(rl->rl_result);
 
-       error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
+       error = find_lkb(ls, lkid, &lkb);
        if (error) {
-               log_error(ls, "recover_process_copy no lkid %x",
-                               le32_to_cpu(rl->rl_lkid));
+               log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
+                         lkid, rc->rc_header.h_nodeid, remid, result);
                return error;
        }
 
-       DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
-
-       error = le32_to_cpu(rl->rl_result);
-
        r = lkb->lkb_resource;
        hold_rsb(r);
        lock_rsb(r);
 
-       switch (error) {
+       if (!is_process_copy(lkb)) {
+               log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
+                         lkid, rc->rc_header.h_nodeid, remid, result);
+               dlm_dump_rsb(r);
+               unlock_rsb(r);
+               put_rsb(r);
+               dlm_put_lkb(lkb);
+               return -EINVAL;
+       }
+
+       switch (result) {
        case -EBADR:
                /* There's a chance the new master received our lock before
                   dlm_recover_master_reply(), this wouldn't happen if we did
                   a barrier between recover_masters and recover_locks. */
-               log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
-                         (unsigned long)r, r->res_name);
+
+               log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
+                         lkid, rc->rc_header.h_nodeid, remid, result);
+       
                dlm_send_rcom_lock(r, lkb);
                goto out;
        case -EEXIST:
-               log_debug(ls, "master copy exists %x", lkb->lkb_id);
-               /* fall through */
        case 0:
-               lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
+               lkb->lkb_remid = remid;
                break;
        default:
-               log_error(ls, "dlm_recover_process_copy unknown error %d %x",
-                         error, lkb->lkb_id);
+               log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
+                         lkid, rc->rc_header.h_nodeid, remid, result);
        }
 
        /* an ack for dlm_recover_locks() which waits for replies from
index 1a255307f6ff68bd1279d6ddc14c851899e1bb02..c8b226c62807a5bf0d4b9d6aebfc4e2b29aa13be 100644 (file)
@@ -15,7 +15,8 @@
 
 void dlm_dump_rsb(struct dlm_rsb *r);
 void dlm_print_lkb(struct dlm_lkb *lkb);
-void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms);
+void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
+                              uint32_t saved_seq);
 void dlm_receive_buffer(union dlm_packet *p, int nodeid);
 int dlm_modes_compat(int mode1, int mode2);
 void dlm_put_rsb(struct dlm_rsb *r);
@@ -31,9 +32,9 @@ void dlm_adjust_timeouts(struct dlm_ls *ls);
 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
                        unsigned int flags, struct dlm_rsb **r_ret);
 
-int dlm_purge_locks(struct dlm_ls *ls);
+void dlm_recover_purge(struct dlm_ls *ls);
 void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
-void dlm_grant_after_purge(struct dlm_ls *ls);
+void dlm_recover_grant(struct dlm_ls *ls);
 int dlm_recover_waiters_post(struct dlm_ls *ls);
 void dlm_recover_waiters_pre(struct dlm_ls *ls);
 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
index a1ea25face828239a3434d6e036f7acffc9cf092..ca506abbdd3b88abdc990e9df6c7dc5b369367b5 100644 (file)
@@ -74,6 +74,19 @@ static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
        return len;
 }
 
+static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
+{
+       return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
+}
+
+static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
+{
+       int val = simple_strtoul(buf, NULL, 0);
+       if (val == 1)
+               set_bit(LSFL_NODIR, &ls->ls_flags);
+       return len;
+}
+
 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
 {
        uint32_t status = dlm_recover_status(ls);
@@ -107,6 +120,12 @@ static struct dlm_attr dlm_attr_id = {
        .store = dlm_id_store
 };
 
+static struct dlm_attr dlm_attr_nodir = {
+       .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
+       .show  = dlm_nodir_show,
+       .store = dlm_nodir_store
+};
+
 static struct dlm_attr dlm_attr_recover_status = {
        .attr  = {.name = "recover_status", .mode = S_IRUGO},
        .show  = dlm_recover_status_show
@@ -121,6 +140,7 @@ static struct attribute *dlm_attrs[] = {
        &dlm_attr_control.attr,
        &dlm_attr_event.attr,
        &dlm_attr_id.attr,
+       &dlm_attr_nodir.attr,
        &dlm_attr_recover_status.attr,
        &dlm_attr_recover_nodeid.attr,
        NULL,
index 133ef6dc7cb790129cc0c9c3c64b35901f8c1d50..5c1b0e38c7a4c7d7dd0865da4c09240a1e880443 100644 (file)
@@ -142,6 +142,7 @@ struct writequeue_entry {
 
 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
 static int dlm_local_count;
+static int dlm_allow_conn;
 
 /* Work queues */
 static struct workqueue_struct *recv_workqueue;
@@ -710,6 +711,13 @@ static int tcp_accept_from_sock(struct connection *con)
        struct connection *newcon;
        struct connection *addcon;
 
+       mutex_lock(&connections_lock);
+       if (!dlm_allow_conn) {
+               mutex_unlock(&connections_lock);
+               return -1;
+       }
+       mutex_unlock(&connections_lock);
+
        memset(&peeraddr, 0, sizeof(peeraddr));
        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
                                  IPPROTO_TCP, &newsock);
@@ -1503,6 +1511,7 @@ void dlm_lowcomms_stop(void)
           socket activity.
        */
        mutex_lock(&connections_lock);
+       dlm_allow_conn = 0;
        foreach_conn(stop_conn);
        mutex_unlock(&connections_lock);
 
@@ -1530,7 +1539,7 @@ int dlm_lowcomms_start(void)
        if (!dlm_local_count) {
                error = -ENOTCONN;
                log_print("no local IP address has been set");
-               goto out;
+               goto fail;
        }
 
        error = -ENOMEM;
@@ -1538,7 +1547,13 @@ int dlm_lowcomms_start(void)
                                      __alignof__(struct connection), 0,
                                      NULL);
        if (!con_cache)
-               goto out;
+               goto fail;
+
+       error = work_start();
+       if (error)
+               goto fail_destroy;
+
+       dlm_allow_conn = 1;
 
        /* Start listening */
        if (dlm_config.ci_protocol == 0)
@@ -1548,20 +1563,17 @@ int dlm_lowcomms_start(void)
        if (error)
                goto fail_unlisten;
 
-       error = work_start();
-       if (error)
-               goto fail_unlisten;
-
        return 0;
 
 fail_unlisten:
+       dlm_allow_conn = 0;
        con = nodeid2con(0,0);
        if (con) {
                close_connection(con, false);
                kmem_cache_free(con_cache, con);
        }
+fail_destroy:
        kmem_cache_destroy(con_cache);
-
-out:
+fail:
        return error;
 }
index da64df7576e18f5b5386aa3d9d5dde9465bba0be..7cd24bccd4fe56aab54db611c4587a8bdc51a7e8 100644 (file)
@@ -21,21 +21,19 @@ static struct kmem_cache *rsb_cache;
 
 int __init dlm_memory_init(void)
 {
-       int ret = 0;
-
        lkb_cache = kmem_cache_create("dlm_lkb", sizeof(struct dlm_lkb),
                                __alignof__(struct dlm_lkb), 0, NULL);
        if (!lkb_cache)
-               ret = -ENOMEM;
+               return -ENOMEM;
 
        rsb_cache = kmem_cache_create("dlm_rsb", sizeof(struct dlm_rsb),
                                __alignof__(struct dlm_rsb), 0, NULL);
        if (!rsb_cache) {
                kmem_cache_destroy(lkb_cache);
-               ret = -ENOMEM;
+               return -ENOMEM;
        }
 
-       return ret;
+       return 0;
 }
 
 void dlm_memory_exit(void)
index ac5c616c969643addc81c8920f4da2cf439356e9..64d3e2b958c7874982e042472c36f00ab617f6bf 100644 (file)
@@ -486,47 +486,50 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
        return 0;
 }
 
-static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
+/* Called by dlm_recv; corresponds to dlm_receive_message() but special
+   recovery-only comms are sent through here. */
+
+void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 {
+       int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
+       int stop, reply = 0, lock = 0;
+       uint32_t status;
        uint64_t seq;
-       int rv = 0;
 
        switch (rc->rc_type) {
+       case DLM_RCOM_LOCK:
+               lock = 1;
+               break;
+       case DLM_RCOM_LOCK_REPLY:
+               lock = 1;
+               reply = 1;
+               break;
        case DLM_RCOM_STATUS_REPLY:
        case DLM_RCOM_NAMES_REPLY:
        case DLM_RCOM_LOOKUP_REPLY:
-       case DLM_RCOM_LOCK_REPLY:
-               spin_lock(&ls->ls_recover_lock);
-               seq = ls->ls_recover_seq;
-               spin_unlock(&ls->ls_recover_lock);
-               if (rc->rc_seq_reply != seq) {
-                       log_debug(ls, "ignoring old reply %x from %d "
-                                     "seq_reply %llx expect %llx",
-                                     rc->rc_type, rc->rc_header.h_nodeid,
-                                     (unsigned long long)rc->rc_seq_reply,
-                                     (unsigned long long)seq);
-                       rv = 1;
-               }
-       }
-       return rv;
-}
-
-/* Called by dlm_recv; corresponds to dlm_receive_message() but special
-   recovery-only comms are sent through here. */
+               reply = 1;
+       };
 
-void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
-{
-       int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
+       spin_lock(&ls->ls_recover_lock);
+       status = ls->ls_recover_status;
+       stop = test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
+       seq = ls->ls_recover_seq;
+       spin_unlock(&ls->ls_recover_lock);
 
-       if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
-               log_debug(ls, "ignoring recovery message %x from %d",
-                         rc->rc_type, nodeid);
+       if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) ||
+           (reply && (rc->rc_seq_reply != seq)) ||
+           (lock && !(status & DLM_RS_DIR))) {
+               log_limit(ls, "dlm_receive_rcom ignore msg %d "
+                         "from %d %llu %llu recover seq %llu sts %x gen %u",
+                          rc->rc_type,
+                          nodeid,
+                          (unsigned long long)rc->rc_seq,
+                          (unsigned long long)rc->rc_seq_reply,
+                          (unsigned long long)seq,
+                          status, ls->ls_generation);
                goto out;
        }
 
-       if (is_old_reply(ls, rc))
-               goto out;
-
        switch (rc->rc_type) {
        case DLM_RCOM_STATUS:
                receive_rcom_status(ls, rc);
index 34d5adf1fce7d2022679d42e1e00673af0825dae..7554e4dac6bbbc0290ac26a1a75438a1551d5060 100644 (file)
@@ -339,9 +339,12 @@ static void set_lock_master(struct list_head *queue, int nodeid)
 {
        struct dlm_lkb *lkb;
 
-       list_for_each_entry(lkb, queue, lkb_statequeue)
-               if (!(lkb->lkb_flags & DLM_IFL_MSTCPY))
+       list_for_each_entry(lkb, queue, lkb_statequeue) {
+               if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
                        lkb->lkb_nodeid = nodeid;
+                       lkb->lkb_remid = 0;
+               }
+       }
 }
 
 static void set_master_lkbs(struct dlm_rsb *r)
@@ -354,18 +357,16 @@ static void set_master_lkbs(struct dlm_rsb *r)
 /*
  * Propagate the new master nodeid to locks
  * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
- * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which
+ * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
  * rsb's to consider.
  */
 
 static void set_new_master(struct dlm_rsb *r, int nodeid)
 {
-       lock_rsb(r);
        r->res_nodeid = nodeid;
        set_master_lkbs(r);
        rsb_set_flag(r, RSB_NEW_MASTER);
        rsb_set_flag(r, RSB_NEW_MASTER2);
-       unlock_rsb(r);
 }
 
 /*
@@ -376,9 +377,9 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
 static int recover_master(struct dlm_rsb *r)
 {
        struct dlm_ls *ls = r->res_ls;
-       int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
-
-       dir_nodeid = dlm_dir_nodeid(r);
+       int error, ret_nodeid;
+       int our_nodeid = dlm_our_nodeid();
+       int dir_nodeid = dlm_dir_nodeid(r);
 
        if (dir_nodeid == our_nodeid) {
                error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
@@ -388,7 +389,9 @@ static int recover_master(struct dlm_rsb *r)
 
                if (ret_nodeid == our_nodeid)
                        ret_nodeid = 0;
+               lock_rsb(r);
                set_new_master(r, ret_nodeid);
+               unlock_rsb(r);
        } else {
                recover_list_add(r);
                error = dlm_send_rcom_lookup(r, dir_nodeid);
@@ -398,24 +401,33 @@ static int recover_master(struct dlm_rsb *r)
 }
 
 /*
- * When not using a directory, most resource names will hash to a new static
- * master nodeid and the resource will need to be remastered.
+ * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
+ * This is necessary because recovery can be started, aborted and restarted,
+ * causing the master nodeid to briefly change during the aborted recovery, and
+ * change back to the original value in the second recovery.  The MSTCPY locks
+ * may or may not have been purged during the aborted recovery.  Another node
+ * with an outstanding request in waiters list and a request reply saved in the
+ * requestqueue, cannot know whether it should ignore the reply and resend the
+ * request, or accept the reply and complete the request.  It must do the
+ * former if the remote node purged MSTCPY locks, and it must do the later if
+ * the remote node did not.  This is solved by always purging MSTCPY locks, in
+ * which case, the request reply would always be ignored and the request
+ * resent.
  */
 
 static int recover_master_static(struct dlm_rsb *r)
 {
-       int master = dlm_dir_nodeid(r);
+       int dir_nodeid = dlm_dir_nodeid(r);
+       int new_master = dir_nodeid;
 
-       if (master == dlm_our_nodeid())
-               master = 0;
+       if (dir_nodeid == dlm_our_nodeid())
+               new_master = 0;
 
-       if (r->res_nodeid != master) {
-               if (is_master(r))
-                       dlm_purge_mstcpy_locks(r);
-               set_new_master(r, master);
-               return 1;
-       }
-       return 0;
+       lock_rsb(r);
+       dlm_purge_mstcpy_locks(r);
+       set_new_master(r, new_master);
+       unlock_rsb(r);
+       return 1;
 }
 
 /*
@@ -481,7 +493,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
        if (nodeid == dlm_our_nodeid())
                nodeid = 0;
 
+       lock_rsb(r);
        set_new_master(r, nodeid);
+       unlock_rsb(r);
        recover_list_del(r);
 
        if (recover_list_empty(ls))
@@ -556,8 +570,6 @@ int dlm_recover_locks(struct dlm_ls *ls)
        struct dlm_rsb *r;
        int error, count = 0;
 
-       log_debug(ls, "dlm_recover_locks");
-
        down_read(&ls->ls_root_sem);
        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
                if (is_master(r)) {
@@ -584,7 +596,7 @@ int dlm_recover_locks(struct dlm_ls *ls)
        }
        up_read(&ls->ls_root_sem);
 
-       log_debug(ls, "dlm_recover_locks %d locks", count);
+       log_debug(ls, "dlm_recover_locks %d out", count);
 
        error = dlm_wait_function(ls, &recover_list_empty);
  out:
@@ -721,21 +733,19 @@ static void recover_conversion(struct dlm_rsb *r)
 }
 
 /* We've become the new master for this rsb and waiting/converting locks may
-   need to be granted in dlm_grant_after_purge() due to locks that may have
+   need to be granted in dlm_recover_grant() due to locks that may have
    existed from a removed node. */
 
-static void set_locks_purged(struct dlm_rsb *r)
+static void recover_grant(struct dlm_rsb *r)
 {
        if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
-               rsb_set_flag(r, RSB_LOCKS_PURGED);
+               rsb_set_flag(r, RSB_RECOVER_GRANT);
 }
 
 void dlm_recover_rsbs(struct dlm_ls *ls)
 {
        struct dlm_rsb *r;
-       int count = 0;
-
-       log_debug(ls, "dlm_recover_rsbs");
+       unsigned int count = 0;
 
        down_read(&ls->ls_root_sem);
        list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
@@ -744,7 +754,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
                        if (rsb_flag(r, RSB_RECOVER_CONVERT))
                                recover_conversion(r);
                        if (rsb_flag(r, RSB_NEW_MASTER2))
-                               set_locks_purged(r);
+                               recover_grant(r);
                        recover_lvb(r);
                        count++;
                }
@@ -754,7 +764,8 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
        }
        up_read(&ls->ls_root_sem);
 
-       log_debug(ls, "dlm_recover_rsbs %d rsbs", count);
+       if (count)
+               log_debug(ls, "dlm_recover_rsbs %d done", count);
 }
 
 /* Create a single list of all root rsb's to be used during recovery */
index 3780caf7ae0c239776951724c8d951dca5aef5dd..f1a9073c0835e0ecce2e896f5a59799e3604f857 100644 (file)
@@ -54,7 +54,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
        unsigned long start;
        int error, neg = 0;
 
-       log_debug(ls, "dlm_recover %llx", (unsigned long long)rv->seq);
+       log_debug(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
 
        mutex_lock(&ls->ls_recoverd_active);
 
@@ -84,6 +84,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
                goto fail;
        }
 
+       ls->ls_recover_locks_in = 0;
+
        dlm_set_recover_status(ls, DLM_RS_NODES);
 
        error = dlm_recover_members_wait(ls);
@@ -130,7 +132,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
                 * Clear lkb's for departed nodes.
                 */
 
-               dlm_purge_locks(ls);
+               dlm_recover_purge(ls);
 
                /*
                 * Get new master nodeid's for rsb's that were mastered on
@@ -161,6 +163,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
                        goto fail;
                }
 
+               log_debug(ls, "dlm_recover_locks %u in",
+                         ls->ls_recover_locks_in);
+
                /*
                 * Finalize state in master rsb's now that all locks can be
                 * checked.  This includes conversion resolution and lvb
@@ -225,9 +230,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
                goto fail;
        }
 
-       dlm_grant_after_purge(ls);
+       dlm_recover_grant(ls);
 
-       log_debug(ls, "dlm_recover %llx generation %u done: %u ms",
+       log_debug(ls, "dlm_recover %llu generation %u done: %u ms",
                  (unsigned long long)rv->seq, ls->ls_generation,
                  jiffies_to_msecs(jiffies - start));
        mutex_unlock(&ls->ls_recoverd_active);
@@ -237,7 +242,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
  fail:
        dlm_release_root_list(ls);
-       log_debug(ls, "dlm_recover %llx error %d",
+       log_debug(ls, "dlm_recover %llu error %d",
                  (unsigned long long)rv->seq, error);
        mutex_unlock(&ls->ls_recoverd_active);
        return error;
index a44fa22890e1dd06797ae5e43e77a86c0cf4ca63..1695f1b0dd456f84c0a4c989069296629b5ea37a 100644 (file)
@@ -19,6 +19,7 @@
 
 struct rq_entry {
        struct list_head list;
+       uint32_t recover_seq;
        int nodeid;
        struct dlm_message request;
 };
@@ -41,6 +42,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
                return;
        }
 
+       e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF;
        e->nodeid = nodeid;
        memcpy(&e->request, ms, ms->m_header.h_length);
 
@@ -63,6 +65,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
 int dlm_process_requestqueue(struct dlm_ls *ls)
 {
        struct rq_entry *e;
+       struct dlm_message *ms;
        int error = 0;
 
        mutex_lock(&ls->ls_requestqueue_mutex);
@@ -76,7 +79,15 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
                e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
                mutex_unlock(&ls->ls_requestqueue_mutex);
 
-               dlm_receive_message_saved(ls, &e->request);
+               ms = &e->request;
+
+               log_limit(ls, "dlm_process_requestqueue msg %d from %d "
+                         "lkid %x remid %x result %d seq %u",
+                         ms->m_type, ms->m_header.h_nodeid,
+                         ms->m_lkid, ms->m_remid, ms->m_result,
+                         e->recover_seq);
+
+               dlm_receive_message_saved(ls, &e->request, e->recover_seq);
 
                mutex_lock(&ls->ls_requestqueue_mutex);
                list_del(&e->list);
@@ -138,35 +149,7 @@ static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
        if (!dlm_no_directory(ls))
                return 0;
 
-       /* with no directory, the master is likely to change as a part of
-          recovery; requests to/from the defunct master need to be purged */
-
-       switch (type) {
-       case DLM_MSG_REQUEST:
-       case DLM_MSG_CONVERT:
-       case DLM_MSG_UNLOCK:
-       case DLM_MSG_CANCEL:
-               /* we're no longer the master of this resource, the sender
-                  will resend to the new master (see waiter_needs_recovery) */
-
-               if (dlm_hash2nodeid(ls, ms->m_hash) != dlm_our_nodeid())
-                       return 1;
-               break;
-
-       case DLM_MSG_REQUEST_REPLY:
-       case DLM_MSG_CONVERT_REPLY:
-       case DLM_MSG_UNLOCK_REPLY:
-       case DLM_MSG_CANCEL_REPLY:
-       case DLM_MSG_GRANT:
-               /* this reply is from the former master of the resource,
-                  we'll resend to the new master if needed */
-
-               if (dlm_hash2nodeid(ls, ms->m_hash) != nodeid)
-                       return 1;
-               break;
-       }
-
-       return 0;
+       return 1;
 }
 
 void dlm_purge_requestqueue(struct dlm_ls *ls)
index aa9949e5de26e74eadd9dd290169d9ae45b5310f..67fd6beffeced9b7a91733d52d39ba5e8aec95f5 100644 (file)
@@ -543,7 +543,6 @@ struct gfs2_sb_host {
 struct lm_lockstruct {
        int ls_jid;
        unsigned int ls_first;
-       unsigned int ls_nodir;
        const struct lm_lockops *ls_ops;
        dlm_lockspace_t *ls_dlm;
 
index 5f5e70e047dc73440ab88f1c08adc51938d7930b..4a38db739ca0a5e725a40fd39d9ebc4ff96ea34b 100644 (file)
@@ -1209,8 +1209,6 @@ static int gdlm_mount(struct gfs2_sbd *sdp, const char *table)
        fsname++;
 
        flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL;
-       if (ls->ls_nodir)
-               flags |= DLM_LSFL_NODIR;
 
        /*
         * create/join lockspace
index c5871ae4056185dc687ea0bc6b956c5897b5cd7a..b8c250fc4922e8dd501a0c12c1481f7d1a32e0d8 100644 (file)
@@ -993,6 +993,7 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent)
                                ls->ls_jid = option;
                        break;
                case Opt_id:
+               case Opt_nodir:
                        /* Obsolete, but left for backward compat purposes */
                        break;
                case Opt_first:
@@ -1001,12 +1002,6 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent)
                                goto hostdata_error;
                        ls->ls_first = option;
                        break;
-               case Opt_nodir:
-                       ret = match_int(&tmp[0], &option);
-                       if (ret || (option != 0 && option != 1))
-                               goto hostdata_error;
-                       ls->ls_nodir = option;
-                       break;
                case Opt_err:
                default:
 hostdata_error:
index d33172c291bad63064dba6eebdddc8daba16b3f6..9c2592b1d5ff74ab0e5c2f800b0d0e4508119021 100644 (file)
@@ -368,10 +368,7 @@ int gfs2_recover_set(struct gfs2_sbd *sdp, unsigned jid)
        struct gfs2_jdesc *jd;
        int rv;
 
-       rv = -ESHUTDOWN;
        spin_lock(&sdp->sd_jindex_spin);
-       if (test_bit(SDF_NORECOVERY, &sdp->sd_flags))
-               goto out;
        rv = -EBUSY;
        if (sdp->sd_jdesc->jd_jid == jid)
                goto out;
@@ -396,8 +393,13 @@ static ssize_t recover_store(struct gfs2_sbd *sdp, const char *buf, size_t len)
        if (rv != 1)
                return -EINVAL;
 
-       rv = gfs2_recover_set(sdp, jid);
+       if (test_bit(SDF_NORECOVERY, &sdp->sd_flags)) {
+               rv = -ESHUTDOWN;
+               goto out;
+       }
 
+       rv = gfs2_recover_set(sdp, jid);
+out:
        return rv ? rv : len;
 }
 
index 6c7f6e9546c7fde54a88228ebc52144e6bd27692..520152411cd111fc2c0b94dbbf04069465de206c 100644 (file)
@@ -67,7 +67,6 @@ struct dlm_lksb {
 
 /* dlm_new_lockspace() flags */
 
-#define DLM_LSFL_NODIR         0x00000001
 #define DLM_LSFL_TIMEWARN      0x00000002
 #define DLM_LSFL_FS            0x00000004
 #define DLM_LSFL_NEWEXCL       0x00000008