Merge tag 'drm-intel-next-fixes-2014-12-30' of git://anongit.freedesktop.org/drm...
[firefly-linux-kernel-4.4.55.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "lowcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 static void toss_rsb(struct kref *kref);
94
95 /*
96  * Lock compatibilty matrix - thanks Steve
97  * UN = Unlocked state. Not really a state, used as a flag
98  * PD = Padding. Used to make the matrix a nice power of two in size
99  * Other states are the same as the VMS DLM.
100  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
101  */
102
103 static const int __dlm_compat_matrix[8][8] = {
104       /* UN NL CR CW PR PW EX PD */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
106         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
107         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
108         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
109         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
110         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
111         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
112         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
113 };
114
115 /*
116  * This defines the direction of transfer of LVB data.
117  * Granted mode is the row; requested mode is the column.
118  * Usage: matrix[grmode+1][rqmode+1]
119  * 1 = LVB is returned to the caller
120  * 0 = LVB is written to the resource
121  * -1 = nothing happens to the LVB
122  */
123
124 const int dlm_lvb_operations[8][8] = {
125         /* UN   NL  CR  CW  PR  PW  EX  PD*/
126         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
127         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
128         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
129         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
130         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
131         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
133         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
134 };
135
136 #define modes_compat(gr, rq) \
137         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138
139 int dlm_modes_compat(int mode1, int mode2)
140 {
141         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
142 }
143
144 /*
145  * Compatibility matrix for conversions with QUECVT set.
146  * Granted mode is the row; requested mode is the column.
147  * Usage: matrix[grmode+1][rqmode+1]
148  */
149
150 static const int __quecvt_compat_matrix[8][8] = {
151       /* UN NL CR CW PR PW EX PD */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
153         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
154         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
155         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
156         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
157         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
158         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
159         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
160 };
161
162 void dlm_print_lkb(struct dlm_lkb *lkb)
163 {
164         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
168                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169                (unsigned long long)lkb->lkb_recover_seq);
170 }
171
172 static void dlm_print_rsb(struct dlm_rsb *r)
173 {
174         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
175                "rlc %d name %s\n",
176                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
178                r->res_name);
179 }
180
181 void dlm_dump_rsb(struct dlm_rsb *r)
182 {
183         struct dlm_lkb *lkb;
184
185         dlm_print_rsb(r);
186
187         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189         printk(KERN_ERR "rsb lookup list\n");
190         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb grant queue:\n");
193         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195         printk(KERN_ERR "rsb convert queue:\n");
196         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
197                 dlm_print_lkb(lkb);
198         printk(KERN_ERR "rsb wait queue:\n");
199         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
200                 dlm_print_lkb(lkb);
201 }
202
203 /* Threads cannot use the lockspace while it's being recovered */
204
205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
206 {
207         down_read(&ls->ls_in_recovery);
208 }
209
210 void dlm_unlock_recovery(struct dlm_ls *ls)
211 {
212         up_read(&ls->ls_in_recovery);
213 }
214
215 int dlm_lock_recovery_try(struct dlm_ls *ls)
216 {
217         return down_read_trylock(&ls->ls_in_recovery);
218 }
219
220 static inline int can_be_queued(struct dlm_lkb *lkb)
221 {
222         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
223 }
224
225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
228 }
229
230 static inline int is_demoted(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
233 }
234
235 static inline int is_altmode(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
238 }
239
240 static inline int is_granted(struct dlm_lkb *lkb)
241 {
242         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
243 }
244
245 static inline int is_remote(struct dlm_rsb *r)
246 {
247         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248         return !!r->res_nodeid;
249 }
250
251 static inline int is_process_copy(struct dlm_lkb *lkb)
252 {
253         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
254 }
255
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
259 }
260
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
277 }
278
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
282 }
283
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
287                                   DLM_IFL_OVERLAP_CANCEL));
288 }
289
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294
295         del_timeout(lkb);
296
297         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298
299         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
300            timeout caused the cancel then return -ETIMEDOUT */
301         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
302                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
303                 rv = -ETIMEDOUT;
304         }
305
306         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
307                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
308                 rv = -EDEADLK;
309         }
310
311         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
312 }
313
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316         queue_cast(r, lkb,
317                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322         if (is_master_copy(lkb)) {
323                 send_bast(r, lkb, rqmode);
324         } else {
325                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
326         }
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 /* This is only called to add a reference when the code already holds
334    a valid reference to the rsb, so there's no need for locking. */
335
336 static inline void hold_rsb(struct dlm_rsb *r)
337 {
338         kref_get(&r->res_ref);
339 }
340
341 void dlm_hold_rsb(struct dlm_rsb *r)
342 {
343         hold_rsb(r);
344 }
345
346 /* When all references to the rsb are gone it's transferred to
347    the tossed list for later disposal. */
348
349 static void put_rsb(struct dlm_rsb *r)
350 {
351         struct dlm_ls *ls = r->res_ls;
352         uint32_t bucket = r->res_bucket;
353
354         spin_lock(&ls->ls_rsbtbl[bucket].lock);
355         kref_put(&r->res_ref, toss_rsb);
356         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
357 }
358
359 void dlm_put_rsb(struct dlm_rsb *r)
360 {
361         put_rsb(r);
362 }
363
364 static int pre_rsb_struct(struct dlm_ls *ls)
365 {
366         struct dlm_rsb *r1, *r2;
367         int count = 0;
368
369         spin_lock(&ls->ls_new_rsb_spin);
370         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
371                 spin_unlock(&ls->ls_new_rsb_spin);
372                 return 0;
373         }
374         spin_unlock(&ls->ls_new_rsb_spin);
375
376         r1 = dlm_allocate_rsb(ls);
377         r2 = dlm_allocate_rsb(ls);
378
379         spin_lock(&ls->ls_new_rsb_spin);
380         if (r1) {
381                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
382                 ls->ls_new_rsb_count++;
383         }
384         if (r2) {
385                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
386                 ls->ls_new_rsb_count++;
387         }
388         count = ls->ls_new_rsb_count;
389         spin_unlock(&ls->ls_new_rsb_spin);
390
391         if (!count)
392                 return -ENOMEM;
393         return 0;
394 }
395
396 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
397    unlock any spinlocks, go back and call pre_rsb_struct again.
398    Otherwise, take an rsb off the list and return it. */
399
400 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
401                           struct dlm_rsb **r_ret)
402 {
403         struct dlm_rsb *r;
404         int count;
405
406         spin_lock(&ls->ls_new_rsb_spin);
407         if (list_empty(&ls->ls_new_rsb)) {
408                 count = ls->ls_new_rsb_count;
409                 spin_unlock(&ls->ls_new_rsb_spin);
410                 log_debug(ls, "find_rsb retry %d %d %s",
411                           count, dlm_config.ci_new_rsb_count, name);
412                 return -EAGAIN;
413         }
414
415         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
416         list_del(&r->res_hashchain);
417         /* Convert the empty list_head to a NULL rb_node for tree usage: */
418         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
419         ls->ls_new_rsb_count--;
420         spin_unlock(&ls->ls_new_rsb_spin);
421
422         r->res_ls = ls;
423         r->res_length = len;
424         memcpy(r->res_name, name, len);
425         mutex_init(&r->res_mutex);
426
427         INIT_LIST_HEAD(&r->res_lookup);
428         INIT_LIST_HEAD(&r->res_grantqueue);
429         INIT_LIST_HEAD(&r->res_convertqueue);
430         INIT_LIST_HEAD(&r->res_waitqueue);
431         INIT_LIST_HEAD(&r->res_root_list);
432         INIT_LIST_HEAD(&r->res_recover_list);
433
434         *r_ret = r;
435         return 0;
436 }
437
438 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
439 {
440         char maxname[DLM_RESNAME_MAXLEN];
441
442         memset(maxname, 0, DLM_RESNAME_MAXLEN);
443         memcpy(maxname, name, nlen);
444         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
445 }
446
447 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
448                         struct dlm_rsb **r_ret)
449 {
450         struct rb_node *node = tree->rb_node;
451         struct dlm_rsb *r;
452         int rc;
453
454         while (node) {
455                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
456                 rc = rsb_cmp(r, name, len);
457                 if (rc < 0)
458                         node = node->rb_left;
459                 else if (rc > 0)
460                         node = node->rb_right;
461                 else
462                         goto found;
463         }
464         *r_ret = NULL;
465         return -EBADR;
466
467  found:
468         *r_ret = r;
469         return 0;
470 }
471
472 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
473 {
474         struct rb_node **newn = &tree->rb_node;
475         struct rb_node *parent = NULL;
476         int rc;
477
478         while (*newn) {
479                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
480                                                res_hashnode);
481
482                 parent = *newn;
483                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
484                 if (rc < 0)
485                         newn = &parent->rb_left;
486                 else if (rc > 0)
487                         newn = &parent->rb_right;
488                 else {
489                         log_print("rsb_insert match");
490                         dlm_dump_rsb(rsb);
491                         dlm_dump_rsb(cur);
492                         return -EEXIST;
493                 }
494         }
495
496         rb_link_node(&rsb->res_hashnode, parent, newn);
497         rb_insert_color(&rsb->res_hashnode, tree);
498         return 0;
499 }
500
501 /*
502  * Find rsb in rsbtbl and potentially create/add one
503  *
504  * Delaying the release of rsb's has a similar benefit to applications keeping
505  * NL locks on an rsb, but without the guarantee that the cached master value
506  * will still be valid when the rsb is reused.  Apps aren't always smart enough
507  * to keep NL locks on an rsb that they may lock again shortly; this can lead
508  * to excessive master lookups and removals if we don't delay the release.
509  *
510  * Searching for an rsb means looking through both the normal list and toss
511  * list.  When found on the toss list the rsb is moved to the normal list with
512  * ref count of 1; when found on normal list the ref count is incremented.
513  *
514  * rsb's on the keep list are being used locally and refcounted.
515  * rsb's on the toss list are not being used locally, and are not refcounted.
516  *
517  * The toss list rsb's were either
518  * - previously used locally but not any more (were on keep list, then
519  *   moved to toss list when last refcount dropped)
520  * - created and put on toss list as a directory record for a lookup
521  *   (we are the dir node for the res, but are not using the res right now,
522  *   but some other node is)
523  *
524  * The purpose of find_rsb() is to return a refcounted rsb for local use.
525  * So, if the given rsb is on the toss list, it is moved to the keep list
526  * before being returned.
527  *
528  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
529  * more refcounts exist, so the rsb is moved from the keep list to the
530  * toss list.
531  *
532  * rsb's on both keep and toss lists are used for doing a name to master
533  * lookups.  rsb's that are in use locally (and being refcounted) are on
534  * the keep list, rsb's that are not in use locally (not refcounted) and
535  * only exist for name/master lookups are on the toss list.
536  *
537  * rsb's on the toss list who's dir_nodeid is not local can have stale
538  * name/master mappings.  So, remote requests on such rsb's can potentially
539  * return with an error, which means the mapping is stale and needs to
540  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
541  * first_lkid is to keep only a single outstanding request on an rsb
542  * while that rsb has a potentially stale master.)
543  */
544
545 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
546                         uint32_t hash, uint32_t b,
547                         int dir_nodeid, int from_nodeid,
548                         unsigned int flags, struct dlm_rsb **r_ret)
549 {
550         struct dlm_rsb *r = NULL;
551         int our_nodeid = dlm_our_nodeid();
552         int from_local = 0;
553         int from_other = 0;
554         int from_dir = 0;
555         int create = 0;
556         int error;
557
558         if (flags & R_RECEIVE_REQUEST) {
559                 if (from_nodeid == dir_nodeid)
560                         from_dir = 1;
561                 else
562                         from_other = 1;
563         } else if (flags & R_REQUEST) {
564                 from_local = 1;
565         }
566
567         /*
568          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
569          * from_nodeid has sent us a lock in dlm_recover_locks, believing
570          * we're the new master.  Our local recovery may not have set
571          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
572          * create the rsb; dlm_recover_process_copy() will handle EBADR
573          * by resending.
574          *
575          * If someone sends us a request, we are the dir node, and we do
576          * not find the rsb anywhere, then recreate it.  This happens if
577          * someone sends us a request after we have removed/freed an rsb
578          * from our toss list.  (They sent a request instead of lookup
579          * because they are using an rsb from their toss list.)
580          */
581
582         if (from_local || from_dir ||
583             (from_other && (dir_nodeid == our_nodeid))) {
584                 create = 1;
585         }
586
587  retry:
588         if (create) {
589                 error = pre_rsb_struct(ls);
590                 if (error < 0)
591                         goto out;
592         }
593
594         spin_lock(&ls->ls_rsbtbl[b].lock);
595
596         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
597         if (error)
598                 goto do_toss;
599         
600         /*
601          * rsb is active, so we can't check master_nodeid without lock_rsb.
602          */
603
604         kref_get(&r->res_ref);
605         error = 0;
606         goto out_unlock;
607
608
609  do_toss:
610         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
611         if (error)
612                 goto do_new;
613
614         /*
615          * rsb found inactive (master_nodeid may be out of date unless
616          * we are the dir_nodeid or were the master)  No other thread
617          * is using this rsb because it's on the toss list, so we can
618          * look at or update res_master_nodeid without lock_rsb.
619          */
620
621         if ((r->res_master_nodeid != our_nodeid) && from_other) {
622                 /* our rsb was not master, and another node (not the dir node)
623                    has sent us a request */
624                 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
625                           from_nodeid, r->res_master_nodeid, dir_nodeid,
626                           r->res_name);
627                 error = -ENOTBLK;
628                 goto out_unlock;
629         }
630
631         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
632                 /* don't think this should ever happen */
633                 log_error(ls, "find_rsb toss from_dir %d master %d",
634                           from_nodeid, r->res_master_nodeid);
635                 dlm_print_rsb(r);
636                 /* fix it and go on */
637                 r->res_master_nodeid = our_nodeid;
638                 r->res_nodeid = 0;
639                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
640                 r->res_first_lkid = 0;
641         }
642
643         if (from_local && (r->res_master_nodeid != our_nodeid)) {
644                 /* Because we have held no locks on this rsb,
645                    res_master_nodeid could have become stale. */
646                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
647                 r->res_first_lkid = 0;
648         }
649
650         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
651         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
652         goto out_unlock;
653
654
655  do_new:
656         /*
657          * rsb not found
658          */
659
660         if (error == -EBADR && !create)
661                 goto out_unlock;
662
663         error = get_rsb_struct(ls, name, len, &r);
664         if (error == -EAGAIN) {
665                 spin_unlock(&ls->ls_rsbtbl[b].lock);
666                 goto retry;
667         }
668         if (error)
669                 goto out_unlock;
670
671         r->res_hash = hash;
672         r->res_bucket = b;
673         r->res_dir_nodeid = dir_nodeid;
674         kref_init(&r->res_ref);
675
676         if (from_dir) {
677                 /* want to see how often this happens */
678                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
679                           from_nodeid, r->res_name);
680                 r->res_master_nodeid = our_nodeid;
681                 r->res_nodeid = 0;
682                 goto out_add;
683         }
684
685         if (from_other && (dir_nodeid != our_nodeid)) {
686                 /* should never happen */
687                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
688                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
689                 dlm_free_rsb(r);
690                 r = NULL;
691                 error = -ENOTBLK;
692                 goto out_unlock;
693         }
694
695         if (from_other) {
696                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
697                           from_nodeid, dir_nodeid, r->res_name);
698         }
699
700         if (dir_nodeid == our_nodeid) {
701                 /* When we are the dir nodeid, we can set the master
702                    node immediately */
703                 r->res_master_nodeid = our_nodeid;
704                 r->res_nodeid = 0;
705         } else {
706                 /* set_master will send_lookup to dir_nodeid */
707                 r->res_master_nodeid = 0;
708                 r->res_nodeid = -1;
709         }
710
711  out_add:
712         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
713  out_unlock:
714         spin_unlock(&ls->ls_rsbtbl[b].lock);
715  out:
716         *r_ret = r;
717         return error;
718 }
719
720 /* During recovery, other nodes can send us new MSTCPY locks (from
721    dlm_recover_locks) before we've made ourself master (in
722    dlm_recover_masters). */
723
724 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
725                           uint32_t hash, uint32_t b,
726                           int dir_nodeid, int from_nodeid,
727                           unsigned int flags, struct dlm_rsb **r_ret)
728 {
729         struct dlm_rsb *r = NULL;
730         int our_nodeid = dlm_our_nodeid();
731         int recover = (flags & R_RECEIVE_RECOVER);
732         int error;
733
734  retry:
735         error = pre_rsb_struct(ls);
736         if (error < 0)
737                 goto out;
738
739         spin_lock(&ls->ls_rsbtbl[b].lock);
740
741         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
742         if (error)
743                 goto do_toss;
744
745         /*
746          * rsb is active, so we can't check master_nodeid without lock_rsb.
747          */
748
749         kref_get(&r->res_ref);
750         goto out_unlock;
751
752
753  do_toss:
754         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
755         if (error)
756                 goto do_new;
757
758         /*
759          * rsb found inactive. No other thread is using this rsb because
760          * it's on the toss list, so we can look at or update
761          * res_master_nodeid without lock_rsb.
762          */
763
764         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
765                 /* our rsb is not master, and another node has sent us a
766                    request; this should never happen */
767                 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
768                           from_nodeid, r->res_master_nodeid, dir_nodeid);
769                 dlm_print_rsb(r);
770                 error = -ENOTBLK;
771                 goto out_unlock;
772         }
773
774         if (!recover && (r->res_master_nodeid != our_nodeid) &&
775             (dir_nodeid == our_nodeid)) {
776                 /* our rsb is not master, and we are dir; may as well fix it;
777                    this should never happen */
778                 log_error(ls, "find_rsb toss our %d master %d dir %d",
779                           our_nodeid, r->res_master_nodeid, dir_nodeid);
780                 dlm_print_rsb(r);
781                 r->res_master_nodeid = our_nodeid;
782                 r->res_nodeid = 0;
783         }
784
785         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
786         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
787         goto out_unlock;
788
789
790  do_new:
791         /*
792          * rsb not found
793          */
794
795         error = get_rsb_struct(ls, name, len, &r);
796         if (error == -EAGAIN) {
797                 spin_unlock(&ls->ls_rsbtbl[b].lock);
798                 goto retry;
799         }
800         if (error)
801                 goto out_unlock;
802
803         r->res_hash = hash;
804         r->res_bucket = b;
805         r->res_dir_nodeid = dir_nodeid;
806         r->res_master_nodeid = dir_nodeid;
807         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
808         kref_init(&r->res_ref);
809
810         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
811  out_unlock:
812         spin_unlock(&ls->ls_rsbtbl[b].lock);
813  out:
814         *r_ret = r;
815         return error;
816 }
817
818 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
819                     unsigned int flags, struct dlm_rsb **r_ret)
820 {
821         uint32_t hash, b;
822         int dir_nodeid;
823
824         if (len > DLM_RESNAME_MAXLEN)
825                 return -EINVAL;
826
827         hash = jhash(name, len, 0);
828         b = hash & (ls->ls_rsbtbl_size - 1);
829
830         dir_nodeid = dlm_hash2nodeid(ls, hash);
831
832         if (dlm_no_directory(ls))
833                 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
834                                       from_nodeid, flags, r_ret);
835         else
836                 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
837                                       from_nodeid, flags, r_ret);
838 }
839
840 /* we have received a request and found that res_master_nodeid != our_nodeid,
841    so we need to return an error or make ourself the master */
842
843 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
844                                   int from_nodeid)
845 {
846         if (dlm_no_directory(ls)) {
847                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
848                           from_nodeid, r->res_master_nodeid,
849                           r->res_dir_nodeid);
850                 dlm_print_rsb(r);
851                 return -ENOTBLK;
852         }
853
854         if (from_nodeid != r->res_dir_nodeid) {
855                 /* our rsb is not master, and another node (not the dir node)
856                    has sent us a request.  this is much more common when our
857                    master_nodeid is zero, so limit debug to non-zero.  */
858
859                 if (r->res_master_nodeid) {
860                         log_debug(ls, "validate master from_other %d master %d "
861                                   "dir %d first %x %s", from_nodeid,
862                                   r->res_master_nodeid, r->res_dir_nodeid,
863                                   r->res_first_lkid, r->res_name);
864                 }
865                 return -ENOTBLK;
866         } else {
867                 /* our rsb is not master, but the dir nodeid has sent us a
868                    request; this could happen with master 0 / res_nodeid -1 */
869
870                 if (r->res_master_nodeid) {
871                         log_error(ls, "validate master from_dir %d master %d "
872                                   "first %x %s",
873                                   from_nodeid, r->res_master_nodeid,
874                                   r->res_first_lkid, r->res_name);
875                 }
876
877                 r->res_master_nodeid = dlm_our_nodeid();
878                 r->res_nodeid = 0;
879                 return 0;
880         }
881 }
882
883 /*
884  * We're the dir node for this res and another node wants to know the
885  * master nodeid.  During normal operation (non recovery) this is only
886  * called from receive_lookup(); master lookups when the local node is
887  * the dir node are done by find_rsb().
888  *
889  * normal operation, we are the dir node for a resource
890  * . _request_lock
891  * . set_master
892  * . send_lookup
893  * . receive_lookup
894  * . dlm_master_lookup flags 0
895  *
896  * recover directory, we are rebuilding dir for all resources
897  * . dlm_recover_directory
898  * . dlm_rcom_names
899  *   remote node sends back the rsb names it is master of and we are dir of
900  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
901  *   we either create new rsb setting remote node as master, or find existing
902  *   rsb and set master to be the remote node.
903  *
904  * recover masters, we are finding the new master for resources
905  * . dlm_recover_masters
906  * . recover_master
907  * . dlm_send_rcom_lookup
908  * . receive_rcom_lookup
909  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
910  */
911
912 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
913                       unsigned int flags, int *r_nodeid, int *result)
914 {
915         struct dlm_rsb *r = NULL;
916         uint32_t hash, b;
917         int from_master = (flags & DLM_LU_RECOVER_DIR);
918         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
919         int our_nodeid = dlm_our_nodeid();
920         int dir_nodeid, error, toss_list = 0;
921
922         if (len > DLM_RESNAME_MAXLEN)
923                 return -EINVAL;
924
925         if (from_nodeid == our_nodeid) {
926                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
927                           our_nodeid, flags);
928                 return -EINVAL;
929         }
930
931         hash = jhash(name, len, 0);
932         b = hash & (ls->ls_rsbtbl_size - 1);
933
934         dir_nodeid = dlm_hash2nodeid(ls, hash);
935         if (dir_nodeid != our_nodeid) {
936                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
937                           from_nodeid, dir_nodeid, our_nodeid, hash,
938                           ls->ls_num_nodes);
939                 *r_nodeid = -1;
940                 return -EINVAL;
941         }
942
943  retry:
944         error = pre_rsb_struct(ls);
945         if (error < 0)
946                 return error;
947
948         spin_lock(&ls->ls_rsbtbl[b].lock);
949         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
950         if (!error) {
951                 /* because the rsb is active, we need to lock_rsb before
952                    checking/changing re_master_nodeid */
953
954                 hold_rsb(r);
955                 spin_unlock(&ls->ls_rsbtbl[b].lock);
956                 lock_rsb(r);
957                 goto found;
958         }
959
960         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
961         if (error)
962                 goto not_found;
963
964         /* because the rsb is inactive (on toss list), it's not refcounted
965            and lock_rsb is not used, but is protected by the rsbtbl lock */
966
967         toss_list = 1;
968  found:
969         if (r->res_dir_nodeid != our_nodeid) {
970                 /* should not happen, but may as well fix it and carry on */
971                 log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
972                           r->res_dir_nodeid, our_nodeid, r->res_name);
973                 r->res_dir_nodeid = our_nodeid;
974         }
975
976         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
977                 /* Recovery uses this function to set a new master when
978                    the previous master failed.  Setting NEW_MASTER will
979                    force dlm_recover_masters to call recover_master on this
980                    rsb even though the res_nodeid is no longer removed. */
981
982                 r->res_master_nodeid = from_nodeid;
983                 r->res_nodeid = from_nodeid;
984                 rsb_set_flag(r, RSB_NEW_MASTER);
985
986                 if (toss_list) {
987                         /* I don't think we should ever find it on toss list. */
988                         log_error(ls, "dlm_master_lookup fix_master on toss");
989                         dlm_dump_rsb(r);
990                 }
991         }
992
993         if (from_master && (r->res_master_nodeid != from_nodeid)) {
994                 /* this will happen if from_nodeid became master during
995                    a previous recovery cycle, and we aborted the previous
996                    cycle before recovering this master value */
997
998                 log_limit(ls, "dlm_master_lookup from_master %d "
999                           "master_nodeid %d res_nodeid %d first %x %s",
1000                           from_nodeid, r->res_master_nodeid, r->res_nodeid,
1001                           r->res_first_lkid, r->res_name);
1002
1003                 if (r->res_master_nodeid == our_nodeid) {
1004                         log_error(ls, "from_master %d our_master", from_nodeid);
1005                         dlm_dump_rsb(r);
1006                         dlm_send_rcom_lookup_dump(r, from_nodeid);
1007                         goto out_found;
1008                 }
1009
1010                 r->res_master_nodeid = from_nodeid;
1011                 r->res_nodeid = from_nodeid;
1012                 rsb_set_flag(r, RSB_NEW_MASTER);
1013         }
1014
1015         if (!r->res_master_nodeid) {
1016                 /* this will happen if recovery happens while we're looking
1017                    up the master for this rsb */
1018
1019                 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1020                           from_nodeid, r->res_first_lkid, r->res_name);
1021                 r->res_master_nodeid = from_nodeid;
1022                 r->res_nodeid = from_nodeid;
1023         }
1024
1025         if (!from_master && !fix_master &&
1026             (r->res_master_nodeid == from_nodeid)) {
1027                 /* this can happen when the master sends remove, the dir node
1028                    finds the rsb on the keep list and ignores the remove,
1029                    and the former master sends a lookup */
1030
1031                 log_limit(ls, "dlm_master_lookup from master %d flags %x "
1032                           "first %x %s", from_nodeid, flags,
1033                           r->res_first_lkid, r->res_name);
1034         }
1035
1036  out_found:
1037         *r_nodeid = r->res_master_nodeid;
1038         if (result)
1039                 *result = DLM_LU_MATCH;
1040
1041         if (toss_list) {
1042                 r->res_toss_time = jiffies;
1043                 /* the rsb was inactive (on toss list) */
1044                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1045         } else {
1046                 /* the rsb was active */
1047                 unlock_rsb(r);
1048                 put_rsb(r);
1049         }
1050         return 0;
1051
1052  not_found:
1053         error = get_rsb_struct(ls, name, len, &r);
1054         if (error == -EAGAIN) {
1055                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1056                 goto retry;
1057         }
1058         if (error)
1059                 goto out_unlock;
1060
1061         r->res_hash = hash;
1062         r->res_bucket = b;
1063         r->res_dir_nodeid = our_nodeid;
1064         r->res_master_nodeid = from_nodeid;
1065         r->res_nodeid = from_nodeid;
1066         kref_init(&r->res_ref);
1067         r->res_toss_time = jiffies;
1068
1069         error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1070         if (error) {
1071                 /* should never happen */
1072                 dlm_free_rsb(r);
1073                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1074                 goto retry;
1075         }
1076
1077         if (result)
1078                 *result = DLM_LU_ADD;
1079         *r_nodeid = from_nodeid;
1080         error = 0;
1081  out_unlock:
1082         spin_unlock(&ls->ls_rsbtbl[b].lock);
1083         return error;
1084 }
1085
1086 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1087 {
1088         struct rb_node *n;
1089         struct dlm_rsb *r;
1090         int i;
1091
1092         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1093                 spin_lock(&ls->ls_rsbtbl[i].lock);
1094                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1095                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1096                         if (r->res_hash == hash)
1097                                 dlm_dump_rsb(r);
1098                 }
1099                 spin_unlock(&ls->ls_rsbtbl[i].lock);
1100         }
1101 }
1102
1103 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1104 {
1105         struct dlm_rsb *r = NULL;
1106         uint32_t hash, b;
1107         int error;
1108
1109         hash = jhash(name, len, 0);
1110         b = hash & (ls->ls_rsbtbl_size - 1);
1111
1112         spin_lock(&ls->ls_rsbtbl[b].lock);
1113         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1114         if (!error)
1115                 goto out_dump;
1116
1117         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1118         if (error)
1119                 goto out;
1120  out_dump:
1121         dlm_dump_rsb(r);
1122  out:
1123         spin_unlock(&ls->ls_rsbtbl[b].lock);
1124 }
1125
1126 static void toss_rsb(struct kref *kref)
1127 {
1128         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1129         struct dlm_ls *ls = r->res_ls;
1130
1131         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1132         kref_init(&r->res_ref);
1133         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1134         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1135         r->res_toss_time = jiffies;
1136         ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK;
1137         if (r->res_lvbptr) {
1138                 dlm_free_lvb(r->res_lvbptr);
1139                 r->res_lvbptr = NULL;
1140         }
1141 }
1142
1143 /* See comment for unhold_lkb */
1144
1145 static void unhold_rsb(struct dlm_rsb *r)
1146 {
1147         int rv;
1148         rv = kref_put(&r->res_ref, toss_rsb);
1149         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1150 }
1151
1152 static void kill_rsb(struct kref *kref)
1153 {
1154         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1155
1156         /* All work is done after the return from kref_put() so we
1157            can release the write_lock before the remove and free. */
1158
1159         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1160         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1161         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1162         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1163         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1164         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1165 }
1166
1167 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1168    The rsb must exist as long as any lkb's for it do. */
1169
1170 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1171 {
1172         hold_rsb(r);
1173         lkb->lkb_resource = r;
1174 }
1175
1176 static void detach_lkb(struct dlm_lkb *lkb)
1177 {
1178         if (lkb->lkb_resource) {
1179                 put_rsb(lkb->lkb_resource);
1180                 lkb->lkb_resource = NULL;
1181         }
1182 }
1183
1184 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1185 {
1186         struct dlm_lkb *lkb;
1187         int rv;
1188
1189         lkb = dlm_allocate_lkb(ls);
1190         if (!lkb)
1191                 return -ENOMEM;
1192
1193         lkb->lkb_nodeid = -1;
1194         lkb->lkb_grmode = DLM_LOCK_IV;
1195         kref_init(&lkb->lkb_ref);
1196         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1197         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1198         INIT_LIST_HEAD(&lkb->lkb_time_list);
1199         INIT_LIST_HEAD(&lkb->lkb_cb_list);
1200         mutex_init(&lkb->lkb_cb_mutex);
1201         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1202
1203         idr_preload(GFP_NOFS);
1204         spin_lock(&ls->ls_lkbidr_spin);
1205         rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT);
1206         if (rv >= 0)
1207                 lkb->lkb_id = rv;
1208         spin_unlock(&ls->ls_lkbidr_spin);
1209         idr_preload_end();
1210
1211         if (rv < 0) {
1212                 log_error(ls, "create_lkb idr error %d", rv);
1213                 return rv;
1214         }
1215
1216         *lkb_ret = lkb;
1217         return 0;
1218 }
1219
1220 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1221 {
1222         struct dlm_lkb *lkb;
1223
1224         spin_lock(&ls->ls_lkbidr_spin);
1225         lkb = idr_find(&ls->ls_lkbidr, lkid);
1226         if (lkb)
1227                 kref_get(&lkb->lkb_ref);
1228         spin_unlock(&ls->ls_lkbidr_spin);
1229
1230         *lkb_ret = lkb;
1231         return lkb ? 0 : -ENOENT;
1232 }
1233
1234 static void kill_lkb(struct kref *kref)
1235 {
1236         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1237
1238         /* All work is done after the return from kref_put() so we
1239            can release the write_lock before the detach_lkb */
1240
1241         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1242 }
1243
1244 /* __put_lkb() is used when an lkb may not have an rsb attached to
1245    it so we need to provide the lockspace explicitly */
1246
1247 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1248 {
1249         uint32_t lkid = lkb->lkb_id;
1250
1251         spin_lock(&ls->ls_lkbidr_spin);
1252         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1253                 idr_remove(&ls->ls_lkbidr, lkid);
1254                 spin_unlock(&ls->ls_lkbidr_spin);
1255
1256                 detach_lkb(lkb);
1257
1258                 /* for local/process lkbs, lvbptr points to caller's lksb */
1259                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1260                         dlm_free_lvb(lkb->lkb_lvbptr);
1261                 dlm_free_lkb(lkb);
1262                 return 1;
1263         } else {
1264                 spin_unlock(&ls->ls_lkbidr_spin);
1265                 return 0;
1266         }
1267 }
1268
1269 int dlm_put_lkb(struct dlm_lkb *lkb)
1270 {
1271         struct dlm_ls *ls;
1272
1273         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1274         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1275
1276         ls = lkb->lkb_resource->res_ls;
1277         return __put_lkb(ls, lkb);
1278 }
1279
1280 /* This is only called to add a reference when the code already holds
1281    a valid reference to the lkb, so there's no need for locking. */
1282
1283 static inline void hold_lkb(struct dlm_lkb *lkb)
1284 {
1285         kref_get(&lkb->lkb_ref);
1286 }
1287
1288 /* This is called when we need to remove a reference and are certain
1289    it's not the last ref.  e.g. del_lkb is always called between a
1290    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1291    put_lkb would work fine, but would involve unnecessary locking */
1292
1293 static inline void unhold_lkb(struct dlm_lkb *lkb)
1294 {
1295         int rv;
1296         rv = kref_put(&lkb->lkb_ref, kill_lkb);
1297         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1298 }
1299
1300 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1301                             int mode)
1302 {
1303         struct dlm_lkb *lkb = NULL;
1304
1305         list_for_each_entry(lkb, head, lkb_statequeue)
1306                 if (lkb->lkb_rqmode < mode)
1307                         break;
1308
1309         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1310 }
1311
1312 /* add/remove lkb to rsb's grant/convert/wait queue */
1313
1314 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1315 {
1316         kref_get(&lkb->lkb_ref);
1317
1318         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1319
1320         lkb->lkb_timestamp = ktime_get();
1321
1322         lkb->lkb_status = status;
1323
1324         switch (status) {
1325         case DLM_LKSTS_WAITING:
1326                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1327                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1328                 else
1329                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1330                 break;
1331         case DLM_LKSTS_GRANTED:
1332                 /* convention says granted locks kept in order of grmode */
1333                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1334                                 lkb->lkb_grmode);
1335                 break;
1336         case DLM_LKSTS_CONVERT:
1337                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1338                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1339                 else
1340                         list_add_tail(&lkb->lkb_statequeue,
1341                                       &r->res_convertqueue);
1342                 break;
1343         default:
1344                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1345         }
1346 }
1347
1348 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1349 {
1350         lkb->lkb_status = 0;
1351         list_del(&lkb->lkb_statequeue);
1352         unhold_lkb(lkb);
1353 }
1354
1355 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1356 {
1357         hold_lkb(lkb);
1358         del_lkb(r, lkb);
1359         add_lkb(r, lkb, sts);
1360         unhold_lkb(lkb);
1361 }
1362
1363 static int msg_reply_type(int mstype)
1364 {
1365         switch (mstype) {
1366         case DLM_MSG_REQUEST:
1367                 return DLM_MSG_REQUEST_REPLY;
1368         case DLM_MSG_CONVERT:
1369                 return DLM_MSG_CONVERT_REPLY;
1370         case DLM_MSG_UNLOCK:
1371                 return DLM_MSG_UNLOCK_REPLY;
1372         case DLM_MSG_CANCEL:
1373                 return DLM_MSG_CANCEL_REPLY;
1374         case DLM_MSG_LOOKUP:
1375                 return DLM_MSG_LOOKUP_REPLY;
1376         }
1377         return -1;
1378 }
1379
1380 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1381 {
1382         int i;
1383
1384         for (i = 0; i < num_nodes; i++) {
1385                 if (!warned[i]) {
1386                         warned[i] = nodeid;
1387                         return 0;
1388                 }
1389                 if (warned[i] == nodeid)
1390                         return 1;
1391         }
1392         return 0;
1393 }
1394
1395 void dlm_scan_waiters(struct dlm_ls *ls)
1396 {
1397         struct dlm_lkb *lkb;
1398         ktime_t zero = ktime_set(0, 0);
1399         s64 us;
1400         s64 debug_maxus = 0;
1401         u32 debug_scanned = 0;
1402         u32 debug_expired = 0;
1403         int num_nodes = 0;
1404         int *warned = NULL;
1405
1406         if (!dlm_config.ci_waitwarn_us)
1407                 return;
1408
1409         mutex_lock(&ls->ls_waiters_mutex);
1410
1411         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1412                 if (ktime_equal(lkb->lkb_wait_time, zero))
1413                         continue;
1414
1415                 debug_scanned++;
1416
1417                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1418
1419                 if (us < dlm_config.ci_waitwarn_us)
1420                         continue;
1421
1422                 lkb->lkb_wait_time = zero;
1423
1424                 debug_expired++;
1425                 if (us > debug_maxus)
1426                         debug_maxus = us;
1427
1428                 if (!num_nodes) {
1429                         num_nodes = ls->ls_num_nodes;
1430                         warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
1431                 }
1432                 if (!warned)
1433                         continue;
1434                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1435                         continue;
1436
1437                 log_error(ls, "waitwarn %x %lld %d us check connection to "
1438                           "node %d", lkb->lkb_id, (long long)us,
1439                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1440         }
1441         mutex_unlock(&ls->ls_waiters_mutex);
1442         kfree(warned);
1443
1444         if (debug_expired)
1445                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1446                           debug_scanned, debug_expired,
1447                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1448 }
1449
1450 /* add/remove lkb from global waiters list of lkb's waiting for
1451    a reply from a remote node */
1452
1453 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1454 {
1455         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1456         int error = 0;
1457
1458         mutex_lock(&ls->ls_waiters_mutex);
1459
1460         if (is_overlap_unlock(lkb) ||
1461             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1462                 error = -EINVAL;
1463                 goto out;
1464         }
1465
1466         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1467                 switch (mstype) {
1468                 case DLM_MSG_UNLOCK:
1469                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1470                         break;
1471                 case DLM_MSG_CANCEL:
1472                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1473                         break;
1474                 default:
1475                         error = -EBUSY;
1476                         goto out;
1477                 }
1478                 lkb->lkb_wait_count++;
1479                 hold_lkb(lkb);
1480
1481                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1482                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1483                           lkb->lkb_wait_count, lkb->lkb_flags);
1484                 goto out;
1485         }
1486
1487         DLM_ASSERT(!lkb->lkb_wait_count,
1488                    dlm_print_lkb(lkb);
1489                    printk("wait_count %d\n", lkb->lkb_wait_count););
1490
1491         lkb->lkb_wait_count++;
1492         lkb->lkb_wait_type = mstype;
1493         lkb->lkb_wait_time = ktime_get();
1494         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1495         hold_lkb(lkb);
1496         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1497  out:
1498         if (error)
1499                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1500                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
1501                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1502         mutex_unlock(&ls->ls_waiters_mutex);
1503         return error;
1504 }
1505
1506 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1507    list as part of process_requestqueue (e.g. a lookup that has an optimized
1508    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1509    set RESEND and dlm_recover_waiters_post() */
1510
1511 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1512                                 struct dlm_message *ms)
1513 {
1514         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1515         int overlap_done = 0;
1516
1517         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1518                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1519                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1520                 overlap_done = 1;
1521                 goto out_del;
1522         }
1523
1524         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1525                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1526                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1527                 overlap_done = 1;
1528                 goto out_del;
1529         }
1530
1531         /* Cancel state was preemptively cleared by a successful convert,
1532            see next comment, nothing to do. */
1533
1534         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1535             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1536                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1537                           lkb->lkb_id, lkb->lkb_wait_type);
1538                 return -1;
1539         }
1540
1541         /* Remove for the convert reply, and premptively remove for the
1542            cancel reply.  A convert has been granted while there's still
1543            an outstanding cancel on it (the cancel is moot and the result
1544            in the cancel reply should be 0).  We preempt the cancel reply
1545            because the app gets the convert result and then can follow up
1546            with another op, like convert.  This subsequent op would see the
1547            lingering state of the cancel and fail with -EBUSY. */
1548
1549         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1550             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1551             is_overlap_cancel(lkb) && ms && !ms->m_result) {
1552                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1553                           lkb->lkb_id);
1554                 lkb->lkb_wait_type = 0;
1555                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1556                 lkb->lkb_wait_count--;
1557                 goto out_del;
1558         }
1559
1560         /* N.B. type of reply may not always correspond to type of original
1561            msg due to lookup->request optimization, verify others? */
1562
1563         if (lkb->lkb_wait_type) {
1564                 lkb->lkb_wait_type = 0;
1565                 goto out_del;
1566         }
1567
1568         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1569                   lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1570                   mstype, lkb->lkb_flags);
1571         return -1;
1572
1573  out_del:
1574         /* the force-unlock/cancel has completed and we haven't recvd a reply
1575            to the op that was in progress prior to the unlock/cancel; we
1576            give up on any reply to the earlier op.  FIXME: not sure when/how
1577            this would happen */
1578
1579         if (overlap_done && lkb->lkb_wait_type) {
1580                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1581                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1582                 lkb->lkb_wait_count--;
1583                 lkb->lkb_wait_type = 0;
1584         }
1585
1586         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1587
1588         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1589         lkb->lkb_wait_count--;
1590         if (!lkb->lkb_wait_count)
1591                 list_del_init(&lkb->lkb_wait_reply);
1592         unhold_lkb(lkb);
1593         return 0;
1594 }
1595
1596 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1597 {
1598         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1599         int error;
1600
1601         mutex_lock(&ls->ls_waiters_mutex);
1602         error = _remove_from_waiters(lkb, mstype, NULL);
1603         mutex_unlock(&ls->ls_waiters_mutex);
1604         return error;
1605 }
1606
1607 /* Handles situations where we might be processing a "fake" or "stub" reply in
1608    which we can't try to take waiters_mutex again. */
1609
1610 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1611 {
1612         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1613         int error;
1614
1615         if (ms->m_flags != DLM_IFL_STUB_MS)
1616                 mutex_lock(&ls->ls_waiters_mutex);
1617         error = _remove_from_waiters(lkb, ms->m_type, ms);
1618         if (ms->m_flags != DLM_IFL_STUB_MS)
1619                 mutex_unlock(&ls->ls_waiters_mutex);
1620         return error;
1621 }
1622
1623 /* If there's an rsb for the same resource being removed, ensure
1624    that the remove message is sent before the new lookup message.
1625    It should be rare to need a delay here, but if not, then it may
1626    be worthwhile to add a proper wait mechanism rather than a delay. */
1627
1628 static void wait_pending_remove(struct dlm_rsb *r)
1629 {
1630         struct dlm_ls *ls = r->res_ls;
1631  restart:
1632         spin_lock(&ls->ls_remove_spin);
1633         if (ls->ls_remove_len &&
1634             !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1635                 log_debug(ls, "delay lookup for remove dir %d %s",
1636                           r->res_dir_nodeid, r->res_name);
1637                 spin_unlock(&ls->ls_remove_spin);
1638                 msleep(1);
1639                 goto restart;
1640         }
1641         spin_unlock(&ls->ls_remove_spin);
1642 }
1643
1644 /*
1645  * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1646  * read by other threads in wait_pending_remove.  ls_remove_names
1647  * and ls_remove_lens are only used by the scan thread, so they do
1648  * not need protection.
1649  */
1650
1651 static void shrink_bucket(struct dlm_ls *ls, int b)
1652 {
1653         struct rb_node *n, *next;
1654         struct dlm_rsb *r;
1655         char *name;
1656         int our_nodeid = dlm_our_nodeid();
1657         int remote_count = 0;
1658         int need_shrink = 0;
1659         int i, len, rv;
1660
1661         memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1662
1663         spin_lock(&ls->ls_rsbtbl[b].lock);
1664
1665         if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) {
1666                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1667                 return;
1668         }
1669
1670         for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1671                 next = rb_next(n);
1672                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1673
1674                 /* If we're the directory record for this rsb, and
1675                    we're not the master of it, then we need to wait
1676                    for the master node to send us a dir remove for
1677                    before removing the dir record. */
1678
1679                 if (!dlm_no_directory(ls) &&
1680                     (r->res_master_nodeid != our_nodeid) &&
1681                     (dlm_dir_nodeid(r) == our_nodeid)) {
1682                         continue;
1683                 }
1684
1685                 need_shrink = 1;
1686
1687                 if (!time_after_eq(jiffies, r->res_toss_time +
1688                                    dlm_config.ci_toss_secs * HZ)) {
1689                         continue;
1690                 }
1691
1692                 if (!dlm_no_directory(ls) &&
1693                     (r->res_master_nodeid == our_nodeid) &&
1694                     (dlm_dir_nodeid(r) != our_nodeid)) {
1695
1696                         /* We're the master of this rsb but we're not
1697                            the directory record, so we need to tell the
1698                            dir node to remove the dir record. */
1699
1700                         ls->ls_remove_lens[remote_count] = r->res_length;
1701                         memcpy(ls->ls_remove_names[remote_count], r->res_name,
1702                                DLM_RESNAME_MAXLEN);
1703                         remote_count++;
1704
1705                         if (remote_count >= DLM_REMOVE_NAMES_MAX)
1706                                 break;
1707                         continue;
1708                 }
1709
1710                 if (!kref_put(&r->res_ref, kill_rsb)) {
1711                         log_error(ls, "tossed rsb in use %s", r->res_name);
1712                         continue;
1713                 }
1714
1715                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1716                 dlm_free_rsb(r);
1717         }
1718
1719         if (need_shrink)
1720                 ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK;
1721         else
1722                 ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK;
1723         spin_unlock(&ls->ls_rsbtbl[b].lock);
1724
1725         /*
1726          * While searching for rsb's to free, we found some that require
1727          * remote removal.  We leave them in place and find them again here
1728          * so there is a very small gap between removing them from the toss
1729          * list and sending the removal.  Keeping this gap small is
1730          * important to keep us (the master node) from being out of sync
1731          * with the remote dir node for very long.
1732          *
1733          * From the time the rsb is removed from toss until just after
1734          * send_remove, the rsb name is saved in ls_remove_name.  A new
1735          * lookup checks this to ensure that a new lookup message for the
1736          * same resource name is not sent just before the remove message.
1737          */
1738
1739         for (i = 0; i < remote_count; i++) {
1740                 name = ls->ls_remove_names[i];
1741                 len = ls->ls_remove_lens[i];
1742
1743                 spin_lock(&ls->ls_rsbtbl[b].lock);
1744                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1745                 if (rv) {
1746                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1747                         log_debug(ls, "remove_name not toss %s", name);
1748                         continue;
1749                 }
1750
1751                 if (r->res_master_nodeid != our_nodeid) {
1752                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1753                         log_debug(ls, "remove_name master %d dir %d our %d %s",
1754                                   r->res_master_nodeid, r->res_dir_nodeid,
1755                                   our_nodeid, name);
1756                         continue;
1757                 }
1758
1759                 if (r->res_dir_nodeid == our_nodeid) {
1760                         /* should never happen */
1761                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1762                         log_error(ls, "remove_name dir %d master %d our %d %s",
1763                                   r->res_dir_nodeid, r->res_master_nodeid,
1764                                   our_nodeid, name);
1765                         continue;
1766                 }
1767
1768                 if (!time_after_eq(jiffies, r->res_toss_time +
1769                                    dlm_config.ci_toss_secs * HZ)) {
1770                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1771                         log_debug(ls, "remove_name toss_time %lu now %lu %s",
1772                                   r->res_toss_time, jiffies, name);
1773                         continue;
1774                 }
1775
1776                 if (!kref_put(&r->res_ref, kill_rsb)) {
1777                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1778                         log_error(ls, "remove_name in use %s", name);
1779                         continue;
1780                 }
1781
1782                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1783
1784                 /* block lookup of same name until we've sent remove */
1785                 spin_lock(&ls->ls_remove_spin);
1786                 ls->ls_remove_len = len;
1787                 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1788                 spin_unlock(&ls->ls_remove_spin);
1789                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1790
1791                 send_remove(r);
1792
1793                 /* allow lookup of name again */
1794                 spin_lock(&ls->ls_remove_spin);
1795                 ls->ls_remove_len = 0;
1796                 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1797                 spin_unlock(&ls->ls_remove_spin);
1798
1799                 dlm_free_rsb(r);
1800         }
1801 }
1802
1803 void dlm_scan_rsbs(struct dlm_ls *ls)
1804 {
1805         int i;
1806
1807         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1808                 shrink_bucket(ls, i);
1809                 if (dlm_locking_stopped(ls))
1810                         break;
1811                 cond_resched();
1812         }
1813 }
1814
1815 static void add_timeout(struct dlm_lkb *lkb)
1816 {
1817         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1818
1819         if (is_master_copy(lkb))
1820                 return;
1821
1822         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1823             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1824                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1825                 goto add_it;
1826         }
1827         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1828                 goto add_it;
1829         return;
1830
1831  add_it:
1832         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1833         mutex_lock(&ls->ls_timeout_mutex);
1834         hold_lkb(lkb);
1835         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1836         mutex_unlock(&ls->ls_timeout_mutex);
1837 }
1838
1839 static void del_timeout(struct dlm_lkb *lkb)
1840 {
1841         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1842
1843         mutex_lock(&ls->ls_timeout_mutex);
1844         if (!list_empty(&lkb->lkb_time_list)) {
1845                 list_del_init(&lkb->lkb_time_list);
1846                 unhold_lkb(lkb);
1847         }
1848         mutex_unlock(&ls->ls_timeout_mutex);
1849 }
1850
1851 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1852    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1853    and then lock rsb because of lock ordering in add_timeout.  We may need
1854    to specify some special timeout-related bits in the lkb that are just to
1855    be accessed under the timeout_mutex. */
1856
1857 void dlm_scan_timeout(struct dlm_ls *ls)
1858 {
1859         struct dlm_rsb *r;
1860         struct dlm_lkb *lkb;
1861         int do_cancel, do_warn;
1862         s64 wait_us;
1863
1864         for (;;) {
1865                 if (dlm_locking_stopped(ls))
1866                         break;
1867
1868                 do_cancel = 0;
1869                 do_warn = 0;
1870                 mutex_lock(&ls->ls_timeout_mutex);
1871                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1872
1873                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1874                                                         lkb->lkb_timestamp));
1875
1876                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1877                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1878                                 do_cancel = 1;
1879
1880                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1881                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1882                                 do_warn = 1;
1883
1884                         if (!do_cancel && !do_warn)
1885                                 continue;
1886                         hold_lkb(lkb);
1887                         break;
1888                 }
1889                 mutex_unlock(&ls->ls_timeout_mutex);
1890
1891                 if (!do_cancel && !do_warn)
1892                         break;
1893
1894                 r = lkb->lkb_resource;
1895                 hold_rsb(r);
1896                 lock_rsb(r);
1897
1898                 if (do_warn) {
1899                         /* clear flag so we only warn once */
1900                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1901                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1902                                 del_timeout(lkb);
1903                         dlm_timeout_warn(lkb);
1904                 }
1905
1906                 if (do_cancel) {
1907                         log_debug(ls, "timeout cancel %x node %d %s",
1908                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1909                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1910                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1911                         del_timeout(lkb);
1912                         _cancel_lock(r, lkb);
1913                 }
1914
1915                 unlock_rsb(r);
1916                 unhold_rsb(r);
1917                 dlm_put_lkb(lkb);
1918         }
1919 }
1920
1921 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1922    dlm_recoverd before checking/setting ls_recover_begin. */
1923
1924 void dlm_adjust_timeouts(struct dlm_ls *ls)
1925 {
1926         struct dlm_lkb *lkb;
1927         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1928
1929         ls->ls_recover_begin = 0;
1930         mutex_lock(&ls->ls_timeout_mutex);
1931         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1932                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1933         mutex_unlock(&ls->ls_timeout_mutex);
1934
1935         if (!dlm_config.ci_waitwarn_us)
1936                 return;
1937
1938         mutex_lock(&ls->ls_waiters_mutex);
1939         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1940                 if (ktime_to_us(lkb->lkb_wait_time))
1941                         lkb->lkb_wait_time = ktime_get();
1942         }
1943         mutex_unlock(&ls->ls_waiters_mutex);
1944 }
1945
1946 /* lkb is master or local copy */
1947
1948 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1949 {
1950         int b, len = r->res_ls->ls_lvblen;
1951
1952         /* b=1 lvb returned to caller
1953            b=0 lvb written to rsb or invalidated
1954            b=-1 do nothing */
1955
1956         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1957
1958         if (b == 1) {
1959                 if (!lkb->lkb_lvbptr)
1960                         return;
1961
1962                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1963                         return;
1964
1965                 if (!r->res_lvbptr)
1966                         return;
1967
1968                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1969                 lkb->lkb_lvbseq = r->res_lvbseq;
1970
1971         } else if (b == 0) {
1972                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1973                         rsb_set_flag(r, RSB_VALNOTVALID);
1974                         return;
1975                 }
1976
1977                 if (!lkb->lkb_lvbptr)
1978                         return;
1979
1980                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1981                         return;
1982
1983                 if (!r->res_lvbptr)
1984                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1985
1986                 if (!r->res_lvbptr)
1987                         return;
1988
1989                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1990                 r->res_lvbseq++;
1991                 lkb->lkb_lvbseq = r->res_lvbseq;
1992                 rsb_clear_flag(r, RSB_VALNOTVALID);
1993         }
1994
1995         if (rsb_flag(r, RSB_VALNOTVALID))
1996                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1997 }
1998
1999 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2000 {
2001         if (lkb->lkb_grmode < DLM_LOCK_PW)
2002                 return;
2003
2004         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
2005                 rsb_set_flag(r, RSB_VALNOTVALID);
2006                 return;
2007         }
2008
2009         if (!lkb->lkb_lvbptr)
2010                 return;
2011
2012         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2013                 return;
2014
2015         if (!r->res_lvbptr)
2016                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2017
2018         if (!r->res_lvbptr)
2019                 return;
2020
2021         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2022         r->res_lvbseq++;
2023         rsb_clear_flag(r, RSB_VALNOTVALID);
2024 }
2025
2026 /* lkb is process copy (pc) */
2027
2028 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2029                             struct dlm_message *ms)
2030 {
2031         int b;
2032
2033         if (!lkb->lkb_lvbptr)
2034                 return;
2035
2036         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2037                 return;
2038
2039         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2040         if (b == 1) {
2041                 int len = receive_extralen(ms);
2042                 if (len > r->res_ls->ls_lvblen)
2043                         len = r->res_ls->ls_lvblen;
2044                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2045                 lkb->lkb_lvbseq = ms->m_lvbseq;
2046         }
2047 }
2048
2049 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2050    remove_lock -- used for unlock, removes lkb from granted
2051    revert_lock -- used for cancel, moves lkb from convert to granted
2052    grant_lock  -- used for request and convert, adds lkb to granted or
2053                   moves lkb from convert or waiting to granted
2054
2055    Each of these is used for master or local copy lkb's.  There is
2056    also a _pc() variation used to make the corresponding change on
2057    a process copy (pc) lkb. */
2058
2059 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2060 {
2061         del_lkb(r, lkb);
2062         lkb->lkb_grmode = DLM_LOCK_IV;
2063         /* this unhold undoes the original ref from create_lkb()
2064            so this leads to the lkb being freed */
2065         unhold_lkb(lkb);
2066 }
2067
2068 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2069 {
2070         set_lvb_unlock(r, lkb);
2071         _remove_lock(r, lkb);
2072 }
2073
2074 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2075 {
2076         _remove_lock(r, lkb);
2077 }
2078
2079 /* returns: 0 did nothing
2080             1 moved lock to granted
2081            -1 removed lock */
2082
2083 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2084 {
2085         int rv = 0;
2086
2087         lkb->lkb_rqmode = DLM_LOCK_IV;
2088
2089         switch (lkb->lkb_status) {
2090         case DLM_LKSTS_GRANTED:
2091                 break;
2092         case DLM_LKSTS_CONVERT:
2093                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2094                 rv = 1;
2095                 break;
2096         case DLM_LKSTS_WAITING:
2097                 del_lkb(r, lkb);
2098                 lkb->lkb_grmode = DLM_LOCK_IV;
2099                 /* this unhold undoes the original ref from create_lkb()
2100                    so this leads to the lkb being freed */
2101                 unhold_lkb(lkb);
2102                 rv = -1;
2103                 break;
2104         default:
2105                 log_print("invalid status for revert %d", lkb->lkb_status);
2106         }
2107         return rv;
2108 }
2109
2110 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2111 {
2112         return revert_lock(r, lkb);
2113 }
2114
2115 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2116 {
2117         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2118                 lkb->lkb_grmode = lkb->lkb_rqmode;
2119                 if (lkb->lkb_status)
2120                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2121                 else
2122                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2123         }
2124
2125         lkb->lkb_rqmode = DLM_LOCK_IV;
2126         lkb->lkb_highbast = 0;
2127 }
2128
2129 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2130 {
2131         set_lvb_lock(r, lkb);
2132         _grant_lock(r, lkb);
2133 }
2134
2135 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2136                           struct dlm_message *ms)
2137 {
2138         set_lvb_lock_pc(r, lkb, ms);
2139         _grant_lock(r, lkb);
2140 }
2141
2142 /* called by grant_pending_locks() which means an async grant message must
2143    be sent to the requesting node in addition to granting the lock if the
2144    lkb belongs to a remote node. */
2145
2146 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2147 {
2148         grant_lock(r, lkb);
2149         if (is_master_copy(lkb))
2150                 send_grant(r, lkb);
2151         else
2152                 queue_cast(r, lkb, 0);
2153 }
2154
2155 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2156    change the granted/requested modes.  We're munging things accordingly in
2157    the process copy.
2158    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2159    conversion deadlock
2160    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2161    compatible with other granted locks */
2162
2163 static void munge_demoted(struct dlm_lkb *lkb)
2164 {
2165         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2166                 log_print("munge_demoted %x invalid modes gr %d rq %d",
2167                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2168                 return;
2169         }
2170
2171         lkb->lkb_grmode = DLM_LOCK_NL;
2172 }
2173
2174 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2175 {
2176         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
2177             ms->m_type != DLM_MSG_GRANT) {
2178                 log_print("munge_altmode %x invalid reply type %d",
2179                           lkb->lkb_id, ms->m_type);
2180                 return;
2181         }
2182
2183         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2184                 lkb->lkb_rqmode = DLM_LOCK_PR;
2185         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2186                 lkb->lkb_rqmode = DLM_LOCK_CW;
2187         else {
2188                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2189                 dlm_print_lkb(lkb);
2190         }
2191 }
2192
2193 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2194 {
2195         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2196                                            lkb_statequeue);
2197         if (lkb->lkb_id == first->lkb_id)
2198                 return 1;
2199
2200         return 0;
2201 }
2202
2203 /* Check if the given lkb conflicts with another lkb on the queue. */
2204
2205 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2206 {
2207         struct dlm_lkb *this;
2208
2209         list_for_each_entry(this, head, lkb_statequeue) {
2210                 if (this == lkb)
2211                         continue;
2212                 if (!modes_compat(this, lkb))
2213                         return 1;
2214         }
2215         return 0;
2216 }
2217
2218 /*
2219  * "A conversion deadlock arises with a pair of lock requests in the converting
2220  * queue for one resource.  The granted mode of each lock blocks the requested
2221  * mode of the other lock."
2222  *
2223  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2224  * convert queue from being granted, then deadlk/demote lkb.
2225  *
2226  * Example:
2227  * Granted Queue: empty
2228  * Convert Queue: NL->EX (first lock)
2229  *                PR->EX (second lock)
2230  *
2231  * The first lock can't be granted because of the granted mode of the second
2232  * lock and the second lock can't be granted because it's not first in the
2233  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2234  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2235  * flag set and return DEMOTED in the lksb flags.
2236  *
2237  * Originally, this function detected conv-deadlk in a more limited scope:
2238  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2239  * - if lkb1 was the first entry in the queue (not just earlier), and was
2240  *   blocked by the granted mode of lkb2, and there was nothing on the
2241  *   granted queue preventing lkb1 from being granted immediately, i.e.
2242  *   lkb2 was the only thing preventing lkb1 from being granted.
2243  *
2244  * That second condition meant we'd only say there was conv-deadlk if
2245  * resolving it (by demotion) would lead to the first lock on the convert
2246  * queue being granted right away.  It allowed conversion deadlocks to exist
2247  * between locks on the convert queue while they couldn't be granted anyway.
2248  *
2249  * Now, we detect and take action on conversion deadlocks immediately when
2250  * they're created, even if they may not be immediately consequential.  If
2251  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2252  * mode that would prevent lkb1's conversion from being granted, we do a
2253  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2254  * I think this means that the lkb_is_ahead condition below should always
2255  * be zero, i.e. there will never be conv-deadlk between two locks that are
2256  * both already on the convert queue.
2257  */
2258
2259 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2260 {
2261         struct dlm_lkb *lkb1;
2262         int lkb_is_ahead = 0;
2263
2264         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2265                 if (lkb1 == lkb2) {
2266                         lkb_is_ahead = 1;
2267                         continue;
2268                 }
2269
2270                 if (!lkb_is_ahead) {
2271                         if (!modes_compat(lkb2, lkb1))
2272                                 return 1;
2273                 } else {
2274                         if (!modes_compat(lkb2, lkb1) &&
2275                             !modes_compat(lkb1, lkb2))
2276                                 return 1;
2277                 }
2278         }
2279         return 0;
2280 }
2281
2282 /*
2283  * Return 1 if the lock can be granted, 0 otherwise.
2284  * Also detect and resolve conversion deadlocks.
2285  *
2286  * lkb is the lock to be granted
2287  *
2288  * now is 1 if the function is being called in the context of the
2289  * immediate request, it is 0 if called later, after the lock has been
2290  * queued.
2291  *
2292  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2293  * after recovery.
2294  *
2295  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2296  */
2297
2298 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2299                            int recover)
2300 {
2301         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2302
2303         /*
2304          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2305          * a new request for a NL mode lock being blocked.
2306          *
2307          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2308          * request, then it would be granted.  In essence, the use of this flag
2309          * tells the Lock Manager to expedite theis request by not considering
2310          * what may be in the CONVERTING or WAITING queues...  As of this
2311          * writing, the EXPEDITE flag can be used only with new requests for NL
2312          * mode locks.  This flag is not valid for conversion requests.
2313          *
2314          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2315          * conversion or used with a non-NL requested mode.  We also know an
2316          * EXPEDITE request is always granted immediately, so now must always
2317          * be 1.  The full condition to grant an expedite request: (now &&
2318          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2319          * therefore be shortened to just checking the flag.
2320          */
2321
2322         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2323                 return 1;
2324
2325         /*
2326          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2327          * added to the remaining conditions.
2328          */
2329
2330         if (queue_conflict(&r->res_grantqueue, lkb))
2331                 return 0;
2332
2333         /*
2334          * 6-3: By default, a conversion request is immediately granted if the
2335          * requested mode is compatible with the modes of all other granted
2336          * locks
2337          */
2338
2339         if (queue_conflict(&r->res_convertqueue, lkb))
2340                 return 0;
2341
2342         /*
2343          * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2344          * locks for a recovered rsb, on which lkb's have been rebuilt.
2345          * The lkb's may have been rebuilt on the queues in a different
2346          * order than they were in on the previous master.  So, granting
2347          * queued conversions in order after recovery doesn't make sense
2348          * since the order hasn't been preserved anyway.  The new order
2349          * could also have created a new "in place" conversion deadlock.
2350          * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2351          * After recovery, there would be no granted locks, and possibly
2352          * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2353          * recovery, grant conversions without considering order.
2354          */
2355
2356         if (conv && recover)
2357                 return 1;
2358
2359         /*
2360          * 6-5: But the default algorithm for deciding whether to grant or
2361          * queue conversion requests does not by itself guarantee that such
2362          * requests are serviced on a "first come first serve" basis.  This, in
2363          * turn, can lead to a phenomenon known as "indefinate postponement".
2364          *
2365          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2366          * the system service employed to request a lock conversion.  This flag
2367          * forces certain conversion requests to be queued, even if they are
2368          * compatible with the granted modes of other locks on the same
2369          * resource.  Thus, the use of this flag results in conversion requests
2370          * being ordered on a "first come first servce" basis.
2371          *
2372          * DCT: This condition is all about new conversions being able to occur
2373          * "in place" while the lock remains on the granted queue (assuming
2374          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2375          * doesn't _have_ to go onto the convert queue where it's processed in
2376          * order.  The "now" variable is necessary to distinguish converts
2377          * being received and processed for the first time now, because once a
2378          * convert is moved to the conversion queue the condition below applies
2379          * requiring fifo granting.
2380          */
2381
2382         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2383                 return 1;
2384
2385         /*
2386          * Even if the convert is compat with all granted locks,
2387          * QUECVT forces it behind other locks on the convert queue.
2388          */
2389
2390         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2391                 if (list_empty(&r->res_convertqueue))
2392                         return 1;
2393                 else
2394                         return 0;
2395         }
2396
2397         /*
2398          * The NOORDER flag is set to avoid the standard vms rules on grant
2399          * order.
2400          */
2401
2402         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2403                 return 1;
2404
2405         /*
2406          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2407          * granted until all other conversion requests ahead of it are granted
2408          * and/or canceled.
2409          */
2410
2411         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2412                 return 1;
2413
2414         /*
2415          * 6-4: By default, a new request is immediately granted only if all
2416          * three of the following conditions are satisfied when the request is
2417          * issued:
2418          * - The queue of ungranted conversion requests for the resource is
2419          *   empty.
2420          * - The queue of ungranted new requests for the resource is empty.
2421          * - The mode of the new request is compatible with the most
2422          *   restrictive mode of all granted locks on the resource.
2423          */
2424
2425         if (now && !conv && list_empty(&r->res_convertqueue) &&
2426             list_empty(&r->res_waitqueue))
2427                 return 1;
2428
2429         /*
2430          * 6-4: Once a lock request is in the queue of ungranted new requests,
2431          * it cannot be granted until the queue of ungranted conversion
2432          * requests is empty, all ungranted new requests ahead of it are
2433          * granted and/or canceled, and it is compatible with the granted mode
2434          * of the most restrictive lock granted on the resource.
2435          */
2436
2437         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2438             first_in_list(lkb, &r->res_waitqueue))
2439                 return 1;
2440
2441         return 0;
2442 }
2443
2444 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2445                           int recover, int *err)
2446 {
2447         int rv;
2448         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2449         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2450
2451         if (err)
2452                 *err = 0;
2453
2454         rv = _can_be_granted(r, lkb, now, recover);
2455         if (rv)
2456                 goto out;
2457
2458         /*
2459          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2460          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2461          * cancels one of the locks.
2462          */
2463
2464         if (is_convert && can_be_queued(lkb) &&
2465             conversion_deadlock_detect(r, lkb)) {
2466                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2467                         lkb->lkb_grmode = DLM_LOCK_NL;
2468                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2469                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2470                         if (err)
2471                                 *err = -EDEADLK;
2472                         else {
2473                                 log_print("can_be_granted deadlock %x now %d",
2474                                           lkb->lkb_id, now);
2475                                 dlm_dump_rsb(r);
2476                         }
2477                 }
2478                 goto out;
2479         }
2480
2481         /*
2482          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2483          * to grant a request in a mode other than the normal rqmode.  It's a
2484          * simple way to provide a big optimization to applications that can
2485          * use them.
2486          */
2487
2488         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2489                 alt = DLM_LOCK_PR;
2490         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2491                 alt = DLM_LOCK_CW;
2492
2493         if (alt) {
2494                 lkb->lkb_rqmode = alt;
2495                 rv = _can_be_granted(r, lkb, now, 0);
2496                 if (rv)
2497                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2498                 else
2499                         lkb->lkb_rqmode = rqmode;
2500         }
2501  out:
2502         return rv;
2503 }
2504
2505 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
2506    for locks pending on the convert list.  Once verified (watch for these
2507    log_prints), we should be able to just call _can_be_granted() and not
2508    bother with the demote/deadlk cases here (and there's no easy way to deal
2509    with a deadlk here, we'd have to generate something like grant_lock with
2510    the deadlk error.) */
2511
2512 /* Returns the highest requested mode of all blocked conversions; sets
2513    cw if there's a blocked conversion to DLM_LOCK_CW. */
2514
2515 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2516                                  unsigned int *count)
2517 {
2518         struct dlm_lkb *lkb, *s;
2519         int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2520         int hi, demoted, quit, grant_restart, demote_restart;
2521         int deadlk;
2522
2523         quit = 0;
2524  restart:
2525         grant_restart = 0;
2526         demote_restart = 0;
2527         hi = DLM_LOCK_IV;
2528
2529         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2530                 demoted = is_demoted(lkb);
2531                 deadlk = 0;
2532
2533                 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2534                         grant_lock_pending(r, lkb);
2535                         grant_restart = 1;
2536                         if (count)
2537                                 (*count)++;
2538                         continue;
2539                 }
2540
2541                 if (!demoted && is_demoted(lkb)) {
2542                         log_print("WARN: pending demoted %x node %d %s",
2543                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2544                         demote_restart = 1;
2545                         continue;
2546                 }
2547
2548                 if (deadlk) {
2549                         log_print("WARN: pending deadlock %x node %d %s",
2550                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2551                         dlm_dump_rsb(r);
2552                         continue;
2553                 }
2554
2555                 hi = max_t(int, lkb->lkb_rqmode, hi);
2556
2557                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2558                         *cw = 1;
2559         }
2560
2561         if (grant_restart)
2562                 goto restart;
2563         if (demote_restart && !quit) {
2564                 quit = 1;
2565                 goto restart;
2566         }
2567
2568         return max_t(int, high, hi);
2569 }
2570
2571 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2572                               unsigned int *count)
2573 {
2574         struct dlm_lkb *lkb, *s;
2575
2576         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2577                 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2578                         grant_lock_pending(r, lkb);
2579                         if (count)
2580                                 (*count)++;
2581                 } else {
2582                         high = max_t(int, lkb->lkb_rqmode, high);
2583                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2584                                 *cw = 1;
2585                 }
2586         }
2587
2588         return high;
2589 }
2590
2591 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2592    on either the convert or waiting queue.
2593    high is the largest rqmode of all locks blocked on the convert or
2594    waiting queue. */
2595
2596 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2597 {
2598         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2599                 if (gr->lkb_highbast < DLM_LOCK_EX)
2600                         return 1;
2601                 return 0;
2602         }
2603
2604         if (gr->lkb_highbast < high &&
2605             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2606                 return 1;
2607         return 0;
2608 }
2609
2610 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2611 {
2612         struct dlm_lkb *lkb, *s;
2613         int high = DLM_LOCK_IV;
2614         int cw = 0;
2615
2616         if (!is_master(r)) {
2617                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2618                 dlm_dump_rsb(r);
2619                 return;
2620         }
2621
2622         high = grant_pending_convert(r, high, &cw, count);
2623         high = grant_pending_wait(r, high, &cw, count);
2624
2625         if (high == DLM_LOCK_IV)
2626                 return;
2627
2628         /*
2629          * If there are locks left on the wait/convert queue then send blocking
2630          * ASTs to granted locks based on the largest requested mode (high)
2631          * found above.
2632          */
2633
2634         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2635                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2636                         if (cw && high == DLM_LOCK_PR &&
2637                             lkb->lkb_grmode == DLM_LOCK_PR)
2638                                 queue_bast(r, lkb, DLM_LOCK_CW);
2639                         else
2640                                 queue_bast(r, lkb, high);
2641                         lkb->lkb_highbast = high;
2642                 }
2643         }
2644 }
2645
2646 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2647 {
2648         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2649             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2650                 if (gr->lkb_highbast < DLM_LOCK_EX)
2651                         return 1;
2652                 return 0;
2653         }
2654
2655         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2656                 return 1;
2657         return 0;
2658 }
2659
2660 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2661                             struct dlm_lkb *lkb)
2662 {
2663         struct dlm_lkb *gr;
2664
2665         list_for_each_entry(gr, head, lkb_statequeue) {
2666                 /* skip self when sending basts to convertqueue */
2667                 if (gr == lkb)
2668                         continue;
2669                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2670                         queue_bast(r, gr, lkb->lkb_rqmode);
2671                         gr->lkb_highbast = lkb->lkb_rqmode;
2672                 }
2673         }
2674 }
2675
2676 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2677 {
2678         send_bast_queue(r, &r->res_grantqueue, lkb);
2679 }
2680
2681 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2682 {
2683         send_bast_queue(r, &r->res_grantqueue, lkb);
2684         send_bast_queue(r, &r->res_convertqueue, lkb);
2685 }
2686
2687 /* set_master(r, lkb) -- set the master nodeid of a resource
2688
2689    The purpose of this function is to set the nodeid field in the given
2690    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2691    known, it can just be copied to the lkb and the function will return
2692    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2693    before it can be copied to the lkb.
2694
2695    When the rsb nodeid is being looked up remotely, the initial lkb
2696    causing the lookup is kept on the ls_waiters list waiting for the
2697    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2698    on the rsb's res_lookup list until the master is verified.
2699
2700    Return values:
2701    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2702    1: the rsb master is not available and the lkb has been placed on
2703       a wait queue
2704 */
2705
2706 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2707 {
2708         int our_nodeid = dlm_our_nodeid();
2709
2710         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2711                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2712                 r->res_first_lkid = lkb->lkb_id;
2713                 lkb->lkb_nodeid = r->res_nodeid;
2714                 return 0;
2715         }
2716
2717         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2718                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2719                 return 1;
2720         }
2721
2722         if (r->res_master_nodeid == our_nodeid) {
2723                 lkb->lkb_nodeid = 0;
2724                 return 0;
2725         }
2726
2727         if (r->res_master_nodeid) {
2728                 lkb->lkb_nodeid = r->res_master_nodeid;
2729                 return 0;
2730         }
2731
2732         if (dlm_dir_nodeid(r) == our_nodeid) {
2733                 /* This is a somewhat unusual case; find_rsb will usually
2734                    have set res_master_nodeid when dir nodeid is local, but
2735                    there are cases where we become the dir node after we've
2736                    past find_rsb and go through _request_lock again.
2737                    confirm_master() or process_lookup_list() needs to be
2738                    called after this. */
2739                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2740                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2741                           r->res_name);
2742                 r->res_master_nodeid = our_nodeid;
2743                 r->res_nodeid = 0;
2744                 lkb->lkb_nodeid = 0;
2745                 return 0;
2746         }
2747
2748         wait_pending_remove(r);
2749
2750         r->res_first_lkid = lkb->lkb_id;
2751         send_lookup(r, lkb);
2752         return 1;
2753 }
2754
2755 static void process_lookup_list(struct dlm_rsb *r)
2756 {
2757         struct dlm_lkb *lkb, *safe;
2758
2759         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2760                 list_del_init(&lkb->lkb_rsb_lookup);
2761                 _request_lock(r, lkb);
2762                 schedule();
2763         }
2764 }
2765
2766 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2767
2768 static void confirm_master(struct dlm_rsb *r, int error)
2769 {
2770         struct dlm_lkb *lkb;
2771
2772         if (!r->res_first_lkid)
2773                 return;
2774
2775         switch (error) {
2776         case 0:
2777         case -EINPROGRESS:
2778                 r->res_first_lkid = 0;
2779                 process_lookup_list(r);
2780                 break;
2781
2782         case -EAGAIN:
2783         case -EBADR:
2784         case -ENOTBLK:
2785                 /* the remote request failed and won't be retried (it was
2786                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2787                    lkb the first_lkid */
2788
2789                 r->res_first_lkid = 0;
2790
2791                 if (!list_empty(&r->res_lookup)) {
2792                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2793                                          lkb_rsb_lookup);
2794                         list_del_init(&lkb->lkb_rsb_lookup);
2795                         r->res_first_lkid = lkb->lkb_id;
2796                         _request_lock(r, lkb);
2797                 }
2798                 break;
2799
2800         default:
2801                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2802         }
2803 }
2804
2805 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2806                          int namelen, unsigned long timeout_cs,
2807                          void (*ast) (void *astparam),
2808                          void *astparam,
2809                          void (*bast) (void *astparam, int mode),
2810                          struct dlm_args *args)
2811 {
2812         int rv = -EINVAL;
2813
2814         /* check for invalid arg usage */
2815
2816         if (mode < 0 || mode > DLM_LOCK_EX)
2817                 goto out;
2818
2819         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2820                 goto out;
2821
2822         if (flags & DLM_LKF_CANCEL)
2823                 goto out;
2824
2825         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2826                 goto out;
2827
2828         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2829                 goto out;
2830
2831         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2832                 goto out;
2833
2834         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2835                 goto out;
2836
2837         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2838                 goto out;
2839
2840         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2841                 goto out;
2842
2843         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2844                 goto out;
2845
2846         if (!ast || !lksb)
2847                 goto out;
2848
2849         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2850                 goto out;
2851
2852         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2853                 goto out;
2854
2855         /* these args will be copied to the lkb in validate_lock_args,
2856            it cannot be done now because when converting locks, fields in
2857            an active lkb cannot be modified before locking the rsb */
2858
2859         args->flags = flags;
2860         args->astfn = ast;
2861         args->astparam = astparam;
2862         args->bastfn = bast;
2863         args->timeout = timeout_cs;
2864         args->mode = mode;
2865         args->lksb = lksb;
2866         rv = 0;
2867  out:
2868         return rv;
2869 }
2870
2871 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2872 {
2873         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2874                       DLM_LKF_FORCEUNLOCK))
2875                 return -EINVAL;
2876
2877         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2878                 return -EINVAL;
2879
2880         args->flags = flags;
2881         args->astparam = astarg;
2882         return 0;
2883 }
2884
2885 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2886                               struct dlm_args *args)
2887 {
2888         int rv = -EINVAL;
2889
2890         if (args->flags & DLM_LKF_CONVERT) {
2891                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2892                         goto out;
2893
2894                 if (args->flags & DLM_LKF_QUECVT &&
2895                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2896                         goto out;
2897
2898                 rv = -EBUSY;
2899                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2900                         goto out;
2901
2902                 if (lkb->lkb_wait_type)
2903                         goto out;
2904
2905                 if (is_overlap(lkb))
2906                         goto out;
2907         }
2908
2909         lkb->lkb_exflags = args->flags;
2910         lkb->lkb_sbflags = 0;
2911         lkb->lkb_astfn = args->astfn;
2912         lkb->lkb_astparam = args->astparam;
2913         lkb->lkb_bastfn = args->bastfn;
2914         lkb->lkb_rqmode = args->mode;
2915         lkb->lkb_lksb = args->lksb;
2916         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2917         lkb->lkb_ownpid = (int) current->pid;
2918         lkb->lkb_timeout_cs = args->timeout;
2919         rv = 0;
2920  out:
2921         if (rv)
2922                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2923                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2924                           lkb->lkb_status, lkb->lkb_wait_type,
2925                           lkb->lkb_resource->res_name);
2926         return rv;
2927 }
2928
2929 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2930    for success */
2931
2932 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2933    because there may be a lookup in progress and it's valid to do
2934    cancel/unlockf on it */
2935
2936 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2937 {
2938         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2939         int rv = -EINVAL;
2940
2941         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2942                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2943                 dlm_print_lkb(lkb);
2944                 goto out;
2945         }
2946
2947         /* an lkb may still exist even though the lock is EOL'ed due to a
2948            cancel, unlock or failed noqueue request; an app can't use these
2949            locks; return same error as if the lkid had not been found at all */
2950
2951         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2952                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2953                 rv = -ENOENT;
2954                 goto out;
2955         }
2956
2957         /* an lkb may be waiting for an rsb lookup to complete where the
2958            lookup was initiated by another lock */
2959
2960         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2961                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2962                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2963                         list_del_init(&lkb->lkb_rsb_lookup);
2964                         queue_cast(lkb->lkb_resource, lkb,
2965                                    args->flags & DLM_LKF_CANCEL ?
2966                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2967                         unhold_lkb(lkb); /* undoes create_lkb() */
2968                 }
2969                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2970                 rv = -EBUSY;
2971                 goto out;
2972         }
2973
2974         /* cancel not allowed with another cancel/unlock in progress */
2975
2976         if (args->flags & DLM_LKF_CANCEL) {
2977                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2978                         goto out;
2979
2980                 if (is_overlap(lkb))
2981                         goto out;
2982
2983                 /* don't let scand try to do a cancel */
2984                 del_timeout(lkb);
2985
2986                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2987                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2988                         rv = -EBUSY;
2989                         goto out;
2990                 }
2991
2992                 /* there's nothing to cancel */
2993                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2994                     !lkb->lkb_wait_type) {
2995                         rv = -EBUSY;
2996                         goto out;
2997                 }
2998
2999                 switch (lkb->lkb_wait_type) {
3000                 case DLM_MSG_LOOKUP:
3001                 case DLM_MSG_REQUEST:
3002                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
3003                         rv = -EBUSY;
3004                         goto out;
3005                 case DLM_MSG_UNLOCK:
3006                 case DLM_MSG_CANCEL:
3007                         goto out;
3008                 }
3009                 /* add_to_waiters() will set OVERLAP_CANCEL */
3010                 goto out_ok;
3011         }
3012
3013         /* do we need to allow a force-unlock if there's a normal unlock
3014            already in progress?  in what conditions could the normal unlock
3015            fail such that we'd want to send a force-unlock to be sure? */
3016
3017         if (args->flags & DLM_LKF_FORCEUNLOCK) {
3018                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3019                         goto out;
3020
3021                 if (is_overlap_unlock(lkb))
3022                         goto out;
3023
3024                 /* don't let scand try to do a cancel */
3025                 del_timeout(lkb);
3026
3027                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3028                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3029                         rv = -EBUSY;
3030                         goto out;
3031                 }
3032
3033                 switch (lkb->lkb_wait_type) {
3034                 case DLM_MSG_LOOKUP:
3035                 case DLM_MSG_REQUEST:
3036                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3037                         rv = -EBUSY;
3038                         goto out;
3039                 case DLM_MSG_UNLOCK:
3040                         goto out;
3041                 }
3042                 /* add_to_waiters() will set OVERLAP_UNLOCK */
3043                 goto out_ok;
3044         }
3045
3046         /* normal unlock not allowed if there's any op in progress */
3047         rv = -EBUSY;
3048         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3049                 goto out;
3050
3051  out_ok:
3052         /* an overlapping op shouldn't blow away exflags from other op */
3053         lkb->lkb_exflags |= args->flags;
3054         lkb->lkb_sbflags = 0;
3055         lkb->lkb_astparam = args->astparam;
3056         rv = 0;
3057  out:
3058         if (rv)
3059                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3060                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3061                           args->flags, lkb->lkb_wait_type,
3062                           lkb->lkb_resource->res_name);
3063         return rv;
3064 }
3065
3066 /*
3067  * Four stage 4 varieties:
3068  * do_request(), do_convert(), do_unlock(), do_cancel()
3069  * These are called on the master node for the given lock and
3070  * from the central locking logic.
3071  */
3072
3073 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3074 {
3075         int error = 0;
3076
3077         if (can_be_granted(r, lkb, 1, 0, NULL)) {
3078                 grant_lock(r, lkb);
3079                 queue_cast(r, lkb, 0);
3080                 goto out;
3081         }
3082
3083         if (can_be_queued(lkb)) {
3084                 error = -EINPROGRESS;
3085                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3086                 add_timeout(lkb);
3087                 goto out;
3088         }
3089
3090         error = -EAGAIN;
3091         queue_cast(r, lkb, -EAGAIN);
3092  out:
3093         return error;
3094 }
3095
3096 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3097                                int error)
3098 {
3099         switch (error) {
3100         case -EAGAIN:
3101                 if (force_blocking_asts(lkb))
3102                         send_blocking_asts_all(r, lkb);
3103                 break;
3104         case -EINPROGRESS:
3105                 send_blocking_asts(r, lkb);
3106                 break;
3107         }
3108 }
3109
3110 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3111 {
3112         int error = 0;
3113         int deadlk = 0;
3114
3115         /* changing an existing lock may allow others to be granted */
3116
3117         if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3118                 grant_lock(r, lkb);
3119                 queue_cast(r, lkb, 0);
3120                 goto out;
3121         }
3122
3123         /* can_be_granted() detected that this lock would block in a conversion
3124            deadlock, so we leave it on the granted queue and return EDEADLK in
3125            the ast for the convert. */
3126
3127         if (deadlk) {
3128                 /* it's left on the granted queue */
3129                 revert_lock(r, lkb);
3130                 queue_cast(r, lkb, -EDEADLK);
3131                 error = -EDEADLK;
3132                 goto out;
3133         }
3134
3135         /* is_demoted() means the can_be_granted() above set the grmode
3136            to NL, and left us on the granted queue.  This auto-demotion
3137            (due to CONVDEADLK) might mean other locks, and/or this lock, are
3138            now grantable.  We have to try to grant other converting locks
3139            before we try again to grant this one. */
3140
3141         if (is_demoted(lkb)) {
3142                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3143                 if (_can_be_granted(r, lkb, 1, 0)) {
3144                         grant_lock(r, lkb);
3145                         queue_cast(r, lkb, 0);
3146                         goto out;
3147                 }
3148                 /* else fall through and move to convert queue */
3149         }
3150
3151         if (can_be_queued(lkb)) {
3152                 error = -EINPROGRESS;
3153                 del_lkb(r, lkb);
3154                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3155                 add_timeout(lkb);
3156                 goto out;
3157         }
3158
3159         error = -EAGAIN;
3160         queue_cast(r, lkb, -EAGAIN);
3161  out:
3162         return error;
3163 }
3164
3165 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3166                                int error)
3167 {
3168         switch (error) {
3169         case 0:
3170                 grant_pending_locks(r, NULL);
3171                 /* grant_pending_locks also sends basts */
3172                 break;
3173         case -EAGAIN:
3174                 if (force_blocking_asts(lkb))
3175                         send_blocking_asts_all(r, lkb);
3176                 break;
3177         case -EINPROGRESS:
3178                 send_blocking_asts(r, lkb);
3179                 break;
3180         }
3181 }
3182
3183 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3184 {
3185         remove_lock(r, lkb);
3186         queue_cast(r, lkb, -DLM_EUNLOCK);
3187         return -DLM_EUNLOCK;
3188 }
3189
3190 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3191                               int error)
3192 {
3193         grant_pending_locks(r, NULL);
3194 }
3195
3196 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3197
3198 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3199 {
3200         int error;
3201
3202         error = revert_lock(r, lkb);
3203         if (error) {
3204                 queue_cast(r, lkb, -DLM_ECANCEL);
3205                 return -DLM_ECANCEL;
3206         }
3207         return 0;
3208 }
3209
3210 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3211                               int error)
3212 {
3213         if (error)
3214                 grant_pending_locks(r, NULL);
3215 }
3216
3217 /*
3218  * Four stage 3 varieties:
3219  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3220  */
3221
3222 /* add a new lkb to a possibly new rsb, called by requesting process */
3223
3224 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3225 {
3226         int error;
3227
3228         /* set_master: sets lkb nodeid from r */
3229
3230         error = set_master(r, lkb);
3231         if (error < 0)
3232                 goto out;
3233         if (error) {
3234                 error = 0;
3235                 goto out;
3236         }
3237
3238         if (is_remote(r)) {
3239                 /* receive_request() calls do_request() on remote node */
3240                 error = send_request(r, lkb);
3241         } else {
3242                 error = do_request(r, lkb);
3243                 /* for remote locks the request_reply is sent
3244                    between do_request and do_request_effects */
3245                 do_request_effects(r, lkb, error);
3246         }
3247  out:
3248         return error;
3249 }
3250
3251 /* change some property of an existing lkb, e.g. mode */
3252
3253 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3254 {
3255         int error;
3256
3257         if (is_remote(r)) {
3258                 /* receive_convert() calls do_convert() on remote node */
3259                 error = send_convert(r, lkb);
3260         } else {
3261                 error = do_convert(r, lkb);
3262                 /* for remote locks the convert_reply is sent
3263                    between do_convert and do_convert_effects */
3264                 do_convert_effects(r, lkb, error);
3265         }
3266
3267         return error;
3268 }
3269
3270 /* remove an existing lkb from the granted queue */
3271
3272 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3273 {
3274         int error;
3275
3276         if (is_remote(r)) {
3277                 /* receive_unlock() calls do_unlock() on remote node */
3278                 error = send_unlock(r, lkb);
3279         } else {
3280                 error = do_unlock(r, lkb);
3281                 /* for remote locks the unlock_reply is sent
3282                    between do_unlock and do_unlock_effects */
3283                 do_unlock_effects(r, lkb, error);
3284         }
3285
3286         return error;
3287 }
3288
3289 /* remove an existing lkb from the convert or wait queue */
3290
3291 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3292 {
3293         int error;
3294
3295         if (is_remote(r)) {
3296                 /* receive_cancel() calls do_cancel() on remote node */
3297                 error = send_cancel(r, lkb);
3298         } else {
3299                 error = do_cancel(r, lkb);
3300                 /* for remote locks the cancel_reply is sent
3301                    between do_cancel and do_cancel_effects */
3302                 do_cancel_effects(r, lkb, error);
3303         }
3304
3305         return error;
3306 }
3307
3308 /*
3309  * Four stage 2 varieties:
3310  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3311  */
3312
3313 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3314                         int len, struct dlm_args *args)
3315 {
3316         struct dlm_rsb *r;
3317         int error;
3318
3319         error = validate_lock_args(ls, lkb, args);
3320         if (error)
3321                 return error;
3322
3323         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3324         if (error)
3325                 return error;
3326
3327         lock_rsb(r);
3328
3329         attach_lkb(r, lkb);
3330         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3331
3332         error = _request_lock(r, lkb);
3333
3334         unlock_rsb(r);
3335         put_rsb(r);
3336         return error;
3337 }
3338
3339 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3340                         struct dlm_args *args)
3341 {
3342         struct dlm_rsb *r;
3343         int error;
3344
3345         r = lkb->lkb_resource;
3346
3347         hold_rsb(r);
3348         lock_rsb(r);
3349
3350         error = validate_lock_args(ls, lkb, args);
3351         if (error)
3352                 goto out;
3353
3354         error = _convert_lock(r, lkb);
3355  out:
3356         unlock_rsb(r);
3357         put_rsb(r);
3358         return error;
3359 }
3360
3361 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3362                        struct dlm_args *args)
3363 {
3364         struct dlm_rsb *r;
3365         int error;
3366
3367         r = lkb->lkb_resource;
3368
3369         hold_rsb(r);
3370         lock_rsb(r);
3371
3372         error = validate_unlock_args(lkb, args);
3373         if (error)
3374                 goto out;
3375
3376         error = _unlock_lock(r, lkb);
3377  out:
3378         unlock_rsb(r);
3379         put_rsb(r);
3380         return error;
3381 }
3382
3383 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3384                        struct dlm_args *args)
3385 {
3386         struct dlm_rsb *r;
3387         int error;
3388
3389         r = lkb->lkb_resource;
3390
3391         hold_rsb(r);
3392         lock_rsb(r);
3393
3394         error = validate_unlock_args(lkb, args);
3395         if (error)
3396                 goto out;
3397
3398         error = _cancel_lock(r, lkb);
3399  out:
3400         unlock_rsb(r);
3401         put_rsb(r);
3402         return error;
3403 }
3404
3405 /*
3406  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3407  */
3408
3409 int dlm_lock(dlm_lockspace_t *lockspace,
3410              int mode,
3411              struct dlm_lksb *lksb,
3412              uint32_t flags,
3413              void *name,
3414              unsigned int namelen,
3415              uint32_t parent_lkid,
3416              void (*ast) (void *astarg),
3417              void *astarg,
3418              void (*bast) (void *astarg, int mode))
3419 {
3420         struct dlm_ls *ls;
3421         struct dlm_lkb *lkb;
3422         struct dlm_args args;
3423         int error, convert = flags & DLM_LKF_CONVERT;
3424
3425         ls = dlm_find_lockspace_local(lockspace);
3426         if (!ls)
3427                 return -EINVAL;
3428
3429         dlm_lock_recovery(ls);
3430
3431         if (convert)
3432                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3433         else
3434                 error = create_lkb(ls, &lkb);
3435
3436         if (error)
3437                 goto out;
3438
3439         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3440                               astarg, bast, &args);
3441         if (error)
3442                 goto out_put;
3443
3444         if (convert)
3445                 error = convert_lock(ls, lkb, &args);
3446         else
3447                 error = request_lock(ls, lkb, name, namelen, &args);
3448
3449         if (error == -EINPROGRESS)
3450                 error = 0;
3451  out_put:
3452         if (convert || error)
3453                 __put_lkb(ls, lkb);
3454         if (error == -EAGAIN || error == -EDEADLK)
3455                 error = 0;
3456  out:
3457         dlm_unlock_recovery(ls);
3458         dlm_put_lockspace(ls);
3459         return error;
3460 }
3461
3462 int dlm_unlock(dlm_lockspace_t *lockspace,
3463                uint32_t lkid,
3464                uint32_t flags,
3465                struct dlm_lksb *lksb,
3466                void *astarg)
3467 {
3468         struct dlm_ls *ls;
3469         struct dlm_lkb *lkb;
3470         struct dlm_args args;
3471         int error;
3472
3473         ls = dlm_find_lockspace_local(lockspace);
3474         if (!ls)
3475                 return -EINVAL;
3476
3477         dlm_lock_recovery(ls);
3478
3479         error = find_lkb(ls, lkid, &lkb);
3480         if (error)
3481                 goto out;
3482
3483         error = set_unlock_args(flags, astarg, &args);
3484         if (error)
3485                 goto out_put;
3486
3487         if (flags & DLM_LKF_CANCEL)
3488                 error = cancel_lock(ls, lkb, &args);
3489         else
3490                 error = unlock_lock(ls, lkb, &args);
3491
3492         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3493                 error = 0;
3494         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3495                 error = 0;
3496  out_put:
3497         dlm_put_lkb(lkb);
3498  out:
3499         dlm_unlock_recovery(ls);
3500         dlm_put_lockspace(ls);
3501         return error;
3502 }
3503
3504 /*
3505  * send/receive routines for remote operations and replies
3506  *
3507  * send_args
3508  * send_common
3509  * send_request                 receive_request
3510  * send_convert                 receive_convert
3511  * send_unlock                  receive_unlock
3512  * send_cancel                  receive_cancel
3513  * send_grant                   receive_grant
3514  * send_bast                    receive_bast
3515  * send_lookup                  receive_lookup
3516  * send_remove                  receive_remove
3517  *
3518  *                              send_common_reply
3519  * receive_request_reply        send_request_reply
3520  * receive_convert_reply        send_convert_reply
3521  * receive_unlock_reply         send_unlock_reply
3522  * receive_cancel_reply         send_cancel_reply
3523  * receive_lookup_reply         send_lookup_reply
3524  */
3525
3526 static int _create_message(struct dlm_ls *ls, int mb_len,
3527                            int to_nodeid, int mstype,
3528                            struct dlm_message **ms_ret,
3529                            struct dlm_mhandle **mh_ret)
3530 {
3531         struct dlm_message *ms;
3532         struct dlm_mhandle *mh;
3533         char *mb;
3534
3535         /* get_buffer gives us a message handle (mh) that we need to
3536            pass into lowcomms_commit and a message buffer (mb) that we
3537            write our data into */
3538
3539         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
3540         if (!mh)
3541                 return -ENOBUFS;
3542
3543         memset(mb, 0, mb_len);
3544
3545         ms = (struct dlm_message *) mb;
3546
3547         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3548         ms->m_header.h_lockspace = ls->ls_global_id;
3549         ms->m_header.h_nodeid = dlm_our_nodeid();
3550         ms->m_header.h_length = mb_len;
3551         ms->m_header.h_cmd = DLM_MSG;
3552
3553         ms->m_type = mstype;
3554
3555         *mh_ret = mh;
3556         *ms_ret = ms;
3557         return 0;
3558 }
3559
3560 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3561                           int to_nodeid, int mstype,
3562                           struct dlm_message **ms_ret,
3563                           struct dlm_mhandle **mh_ret)
3564 {
3565         int mb_len = sizeof(struct dlm_message);
3566
3567         switch (mstype) {
3568         case DLM_MSG_REQUEST:
3569         case DLM_MSG_LOOKUP:
3570         case DLM_MSG_REMOVE:
3571                 mb_len += r->res_length;
3572                 break;
3573         case DLM_MSG_CONVERT:
3574         case DLM_MSG_UNLOCK:
3575         case DLM_MSG_REQUEST_REPLY:
3576         case DLM_MSG_CONVERT_REPLY:
3577         case DLM_MSG_GRANT:
3578                 if (lkb && lkb->lkb_lvbptr)
3579                         mb_len += r->res_ls->ls_lvblen;
3580                 break;
3581         }
3582
3583         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3584                                ms_ret, mh_ret);
3585 }
3586
3587 /* further lowcomms enhancements or alternate implementations may make
3588    the return value from this function useful at some point */
3589
3590 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3591 {
3592         dlm_message_out(ms);
3593         dlm_lowcomms_commit_buffer(mh);
3594         return 0;
3595 }
3596
3597 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3598                       struct dlm_message *ms)
3599 {
3600         ms->m_nodeid   = lkb->lkb_nodeid;
3601         ms->m_pid      = lkb->lkb_ownpid;
3602         ms->m_lkid     = lkb->lkb_id;
3603         ms->m_remid    = lkb->lkb_remid;
3604         ms->m_exflags  = lkb->lkb_exflags;
3605         ms->m_sbflags  = lkb->lkb_sbflags;
3606         ms->m_flags    = lkb->lkb_flags;
3607         ms->m_lvbseq   = lkb->lkb_lvbseq;
3608         ms->m_status   = lkb->lkb_status;
3609         ms->m_grmode   = lkb->lkb_grmode;
3610         ms->m_rqmode   = lkb->lkb_rqmode;
3611         ms->m_hash     = r->res_hash;
3612
3613         /* m_result and m_bastmode are set from function args,
3614            not from lkb fields */
3615
3616         if (lkb->lkb_bastfn)
3617                 ms->m_asts |= DLM_CB_BAST;
3618         if (lkb->lkb_astfn)
3619                 ms->m_asts |= DLM_CB_CAST;
3620
3621         /* compare with switch in create_message; send_remove() doesn't
3622            use send_args() */
3623
3624         switch (ms->m_type) {
3625         case DLM_MSG_REQUEST:
3626         case DLM_MSG_LOOKUP:
3627                 memcpy(ms->m_extra, r->res_name, r->res_length);
3628                 break;
3629         case DLM_MSG_CONVERT:
3630         case DLM_MSG_UNLOCK:
3631         case DLM_MSG_REQUEST_REPLY:
3632         case DLM_MSG_CONVERT_REPLY:
3633         case DLM_MSG_GRANT:
3634                 if (!lkb->lkb_lvbptr)
3635                         break;
3636                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3637                 break;
3638         }
3639 }
3640
3641 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3642 {
3643         struct dlm_message *ms;
3644         struct dlm_mhandle *mh;
3645         int to_nodeid, error;
3646
3647         to_nodeid = r->res_nodeid;
3648
3649         error = add_to_waiters(lkb, mstype, to_nodeid);
3650         if (error)
3651                 return error;
3652
3653         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3654         if (error)
3655                 goto fail;
3656
3657         send_args(r, lkb, ms);
3658
3659         error = send_message(mh, ms);
3660         if (error)
3661                 goto fail;
3662         return 0;
3663
3664  fail:
3665         remove_from_waiters(lkb, msg_reply_type(mstype));
3666         return error;
3667 }
3668
3669 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3670 {
3671         return send_common(r, lkb, DLM_MSG_REQUEST);
3672 }
3673
3674 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3675 {
3676         int error;
3677
3678         error = send_common(r, lkb, DLM_MSG_CONVERT);
3679
3680         /* down conversions go without a reply from the master */
3681         if (!error && down_conversion(lkb)) {
3682                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3683                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3684                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3685                 r->res_ls->ls_stub_ms.m_result = 0;
3686                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3687         }
3688
3689         return error;
3690 }
3691
3692 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3693    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3694    that the master is still correct. */
3695
3696 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3697 {
3698         return send_common(r, lkb, DLM_MSG_UNLOCK);
3699 }
3700
3701 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3702 {
3703         return send_common(r, lkb, DLM_MSG_CANCEL);
3704 }
3705
3706 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3707 {
3708         struct dlm_message *ms;
3709         struct dlm_mhandle *mh;
3710         int to_nodeid, error;
3711
3712         to_nodeid = lkb->lkb_nodeid;
3713
3714         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3715         if (error)
3716                 goto out;
3717
3718         send_args(r, lkb, ms);
3719
3720         ms->m_result = 0;
3721
3722         error = send_message(mh, ms);
3723  out:
3724         return error;
3725 }
3726
3727 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3728 {
3729         struct dlm_message *ms;
3730         struct dlm_mhandle *mh;
3731         int to_nodeid, error;
3732
3733         to_nodeid = lkb->lkb_nodeid;
3734
3735         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3736         if (error)
3737                 goto out;
3738
3739         send_args(r, lkb, ms);
3740
3741         ms->m_bastmode = mode;
3742
3743         error = send_message(mh, ms);
3744  out:
3745         return error;
3746 }
3747
3748 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3749 {
3750         struct dlm_message *ms;
3751         struct dlm_mhandle *mh;
3752         int to_nodeid, error;
3753
3754         to_nodeid = dlm_dir_nodeid(r);
3755
3756         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3757         if (error)
3758                 return error;
3759
3760         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3761         if (error)
3762                 goto fail;
3763
3764         send_args(r, lkb, ms);
3765
3766         error = send_message(mh, ms);
3767         if (error)
3768                 goto fail;
3769         return 0;
3770
3771  fail:
3772         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3773         return error;
3774 }
3775
3776 static int send_remove(struct dlm_rsb *r)
3777 {
3778         struct dlm_message *ms;
3779         struct dlm_mhandle *mh;
3780         int to_nodeid, error;
3781
3782         to_nodeid = dlm_dir_nodeid(r);
3783
3784         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3785         if (error)
3786                 goto out;
3787
3788         memcpy(ms->m_extra, r->res_name, r->res_length);
3789         ms->m_hash = r->res_hash;
3790
3791         error = send_message(mh, ms);
3792  out:
3793         return error;
3794 }
3795
3796 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3797                              int mstype, int rv)
3798 {
3799         struct dlm_message *ms;
3800         struct dlm_mhandle *mh;
3801         int to_nodeid, error;
3802
3803         to_nodeid = lkb->lkb_nodeid;
3804
3805         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3806         if (error)
3807                 goto out;
3808
3809         send_args(r, lkb, ms);
3810
3811         ms->m_result = rv;
3812
3813         error = send_message(mh, ms);
3814  out:
3815         return error;
3816 }
3817
3818 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3819 {
3820         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3821 }
3822
3823 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3824 {
3825         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3826 }
3827
3828 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3829 {
3830         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3831 }
3832
3833 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3834 {
3835         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3836 }
3837
3838 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3839                              int ret_nodeid, int rv)
3840 {
3841         struct dlm_rsb *r = &ls->ls_stub_rsb;
3842         struct dlm_message *ms;
3843         struct dlm_mhandle *mh;
3844         int error, nodeid = ms_in->m_header.h_nodeid;
3845
3846         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3847         if (error)
3848                 goto out;
3849
3850         ms->m_lkid = ms_in->m_lkid;
3851         ms->m_result = rv;
3852         ms->m_nodeid = ret_nodeid;
3853
3854         error = send_message(mh, ms);
3855  out:
3856         return error;
3857 }
3858
3859 /* which args we save from a received message depends heavily on the type
3860    of message, unlike the send side where we can safely send everything about
3861    the lkb for any type of message */
3862
3863 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3864 {
3865         lkb->lkb_exflags = ms->m_exflags;
3866         lkb->lkb_sbflags = ms->m_sbflags;
3867         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3868                          (ms->m_flags & 0x0000FFFF);
3869 }
3870
3871 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3872 {
3873         if (ms->m_flags == DLM_IFL_STUB_MS)
3874                 return;
3875
3876         lkb->lkb_sbflags = ms->m_sbflags;
3877         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3878                          (ms->m_flags & 0x0000FFFF);
3879 }
3880
3881 static int receive_extralen(struct dlm_message *ms)
3882 {
3883         return (ms->m_header.h_length - sizeof(struct dlm_message));
3884 }
3885
3886 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3887                        struct dlm_message *ms)
3888 {
3889         int len;
3890
3891         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3892                 if (!lkb->lkb_lvbptr)
3893                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3894                 if (!lkb->lkb_lvbptr)
3895                         return -ENOMEM;
3896                 len = receive_extralen(ms);
3897                 if (len > ls->ls_lvblen)
3898                         len = ls->ls_lvblen;
3899                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3900         }
3901         return 0;
3902 }
3903
3904 static void fake_bastfn(void *astparam, int mode)
3905 {
3906         log_print("fake_bastfn should not be called");
3907 }
3908
3909 static void fake_astfn(void *astparam)
3910 {
3911         log_print("fake_astfn should not be called");
3912 }
3913
3914 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3915                                 struct dlm_message *ms)
3916 {
3917         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3918         lkb->lkb_ownpid = ms->m_pid;
3919         lkb->lkb_remid = ms->m_lkid;
3920         lkb->lkb_grmode = DLM_LOCK_IV;
3921         lkb->lkb_rqmode = ms->m_rqmode;
3922
3923         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3924         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3925
3926         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3927                 /* lkb was just created so there won't be an lvb yet */
3928                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3929                 if (!lkb->lkb_lvbptr)
3930                         return -ENOMEM;
3931         }
3932
3933         return 0;
3934 }
3935
3936 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3937                                 struct dlm_message *ms)
3938 {
3939         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3940                 return -EBUSY;
3941
3942         if (receive_lvb(ls, lkb, ms))
3943                 return -ENOMEM;
3944
3945         lkb->lkb_rqmode = ms->m_rqmode;
3946         lkb->lkb_lvbseq = ms->m_lvbseq;
3947
3948         return 0;
3949 }
3950
3951 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3952                                struct dlm_message *ms)
3953 {
3954         if (receive_lvb(ls, lkb, ms))
3955                 return -ENOMEM;
3956         return 0;
3957 }
3958
3959 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3960    uses to send a reply and that the remote end uses to process the reply. */
3961
3962 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3963 {
3964         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3965         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3966         lkb->lkb_remid = ms->m_lkid;
3967 }
3968
3969 /* This is called after the rsb is locked so that we can safely inspect
3970    fields in the lkb. */
3971
3972 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3973 {
3974         int from = ms->m_header.h_nodeid;
3975         int error = 0;
3976
3977         switch (ms->m_type) {
3978         case DLM_MSG_CONVERT:
3979         case DLM_MSG_UNLOCK:
3980         case DLM_MSG_CANCEL:
3981                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3982                         error = -EINVAL;
3983                 break;
3984
3985         case DLM_MSG_CONVERT_REPLY:
3986         case DLM_MSG_UNLOCK_REPLY:
3987         case DLM_MSG_CANCEL_REPLY:
3988         case DLM_MSG_GRANT:
3989         case DLM_MSG_BAST:
3990                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3991                         error = -EINVAL;
3992                 break;
3993
3994         case DLM_MSG_REQUEST_REPLY:
3995                 if (!is_process_copy(lkb))
3996                         error = -EINVAL;
3997                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3998                         error = -EINVAL;
3999                 break;
4000
4001         default:
4002                 error = -EINVAL;
4003         }
4004
4005         if (error)
4006                 log_error(lkb->lkb_resource->res_ls,
4007                           "ignore invalid message %d from %d %x %x %x %d",
4008                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
4009                           lkb->lkb_flags, lkb->lkb_nodeid);
4010         return error;
4011 }
4012
4013 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4014 {
4015         char name[DLM_RESNAME_MAXLEN + 1];
4016         struct dlm_message *ms;
4017         struct dlm_mhandle *mh;
4018         struct dlm_rsb *r;
4019         uint32_t hash, b;
4020         int rv, dir_nodeid;
4021
4022         memset(name, 0, sizeof(name));
4023         memcpy(name, ms_name, len);
4024
4025         hash = jhash(name, len, 0);
4026         b = hash & (ls->ls_rsbtbl_size - 1);
4027
4028         dir_nodeid = dlm_hash2nodeid(ls, hash);
4029
4030         log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4031
4032         spin_lock(&ls->ls_rsbtbl[b].lock);
4033         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4034         if (!rv) {
4035                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4036                 log_error(ls, "repeat_remove on keep %s", name);
4037                 return;
4038         }
4039
4040         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4041         if (!rv) {
4042                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4043                 log_error(ls, "repeat_remove on toss %s", name);
4044                 return;
4045         }
4046
4047         /* use ls->remove_name2 to avoid conflict with shrink? */
4048
4049         spin_lock(&ls->ls_remove_spin);
4050         ls->ls_remove_len = len;
4051         memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4052         spin_unlock(&ls->ls_remove_spin);
4053         spin_unlock(&ls->ls_rsbtbl[b].lock);
4054
4055         rv = _create_message(ls, sizeof(struct dlm_message) + len,
4056                              dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4057         if (rv)
4058                 return;
4059
4060         memcpy(ms->m_extra, name, len);
4061         ms->m_hash = hash;
4062
4063         send_message(mh, ms);
4064
4065         spin_lock(&ls->ls_remove_spin);
4066         ls->ls_remove_len = 0;
4067         memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4068         spin_unlock(&ls->ls_remove_spin);
4069 }
4070
4071 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4072 {
4073         struct dlm_lkb *lkb;
4074         struct dlm_rsb *r;
4075         int from_nodeid;
4076         int error, namelen = 0;
4077
4078         from_nodeid = ms->m_header.h_nodeid;
4079
4080         error = create_lkb(ls, &lkb);
4081         if (error)
4082                 goto fail;
4083
4084         receive_flags(lkb, ms);
4085         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4086         error = receive_request_args(ls, lkb, ms);
4087         if (error) {
4088                 __put_lkb(ls, lkb);
4089                 goto fail;
4090         }
4091
4092         /* The dir node is the authority on whether we are the master
4093            for this rsb or not, so if the master sends us a request, we should
4094            recreate the rsb if we've destroyed it.   This race happens when we
4095            send a remove message to the dir node at the same time that the dir
4096            node sends us a request for the rsb. */
4097
4098         namelen = receive_extralen(ms);
4099
4100         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4101                          R_RECEIVE_REQUEST, &r);
4102         if (error) {
4103                 __put_lkb(ls, lkb);
4104                 goto fail;
4105         }
4106
4107         lock_rsb(r);
4108
4109         if (r->res_master_nodeid != dlm_our_nodeid()) {
4110                 error = validate_master_nodeid(ls, r, from_nodeid);
4111                 if (error) {
4112                         unlock_rsb(r);
4113                         put_rsb(r);
4114                         __put_lkb(ls, lkb);
4115                         goto fail;
4116                 }
4117         }
4118
4119         attach_lkb(r, lkb);
4120         error = do_request(r, lkb);
4121         send_request_reply(r, lkb, error);
4122         do_request_effects(r, lkb, error);
4123
4124         unlock_rsb(r);
4125         put_rsb(r);
4126
4127         if (error == -EINPROGRESS)
4128                 error = 0;
4129         if (error)
4130                 dlm_put_lkb(lkb);
4131         return 0;
4132
4133  fail:
4134         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4135            and do this receive_request again from process_lookup_list once
4136            we get the lookup reply.  This would avoid a many repeated
4137            ENOTBLK request failures when the lookup reply designating us
4138            as master is delayed. */
4139
4140         /* We could repeatedly return -EBADR here if our send_remove() is
4141            delayed in being sent/arriving/being processed on the dir node.
4142            Another node would repeatedly lookup up the master, and the dir
4143            node would continue returning our nodeid until our send_remove
4144            took effect.
4145
4146            We send another remove message in case our previous send_remove
4147            was lost/ignored/missed somehow. */
4148
4149         if (error != -ENOTBLK) {
4150                 log_limit(ls, "receive_request %x from %d %d",
4151                           ms->m_lkid, from_nodeid, error);
4152         }
4153
4154         if (namelen && error == -EBADR) {
4155                 send_repeat_remove(ls, ms->m_extra, namelen);
4156                 msleep(1000);
4157         }
4158
4159         setup_stub_lkb(ls, ms);
4160         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4161         return error;
4162 }
4163
4164 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4165 {
4166         struct dlm_lkb *lkb;
4167         struct dlm_rsb *r;
4168         int error, reply = 1;
4169
4170         error = find_lkb(ls, ms->m_remid, &lkb);
4171         if (error)
4172                 goto fail;
4173
4174         if (lkb->lkb_remid != ms->m_lkid) {
4175                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4176                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4177                           (unsigned long long)lkb->lkb_recover_seq,
4178                           ms->m_header.h_nodeid, ms->m_lkid);
4179                 error = -ENOENT;
4180                 goto fail;
4181         }
4182
4183         r = lkb->lkb_resource;
4184
4185         hold_rsb(r);
4186         lock_rsb(r);
4187
4188         error = validate_message(lkb, ms);
4189         if (error)
4190                 goto out;
4191
4192         receive_flags(lkb, ms);
4193
4194         error = receive_convert_args(ls, lkb, ms);
4195         if (error) {
4196                 send_convert_reply(r, lkb, error);
4197                 goto out;
4198         }
4199
4200         reply = !down_conversion(lkb);
4201
4202         error = do_convert(r, lkb);
4203         if (reply)
4204                 send_convert_reply(r, lkb, error);
4205         do_convert_effects(r, lkb, error);
4206  out:
4207         unlock_rsb(r);
4208         put_rsb(r);
4209         dlm_put_lkb(lkb);
4210         return 0;
4211
4212  fail:
4213         setup_stub_lkb(ls, ms);
4214         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4215         return error;
4216 }
4217
4218 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4219 {
4220         struct dlm_lkb *lkb;
4221         struct dlm_rsb *r;
4222         int error;
4223
4224         error = find_lkb(ls, ms->m_remid, &lkb);
4225         if (error)
4226                 goto fail;
4227
4228         if (lkb->lkb_remid != ms->m_lkid) {
4229                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4230                           lkb->lkb_id, lkb->lkb_remid,
4231                           ms->m_header.h_nodeid, ms->m_lkid);
4232                 error = -ENOENT;
4233                 goto fail;
4234         }
4235
4236         r = lkb->lkb_resource;
4237
4238         hold_rsb(r);
4239         lock_rsb(r);
4240
4241         error = validate_message(lkb, ms);
4242         if (error)
4243                 goto out;
4244
4245         receive_flags(lkb, ms);
4246
4247         error = receive_unlock_args(ls, lkb, ms);
4248         if (error) {
4249                 send_unlock_reply(r, lkb, error);
4250                 goto out;
4251         }
4252
4253         error = do_unlock(r, lkb);
4254         send_unlock_reply(r, lkb, error);
4255         do_unlock_effects(r, lkb, error);
4256  out:
4257         unlock_rsb(r);
4258         put_rsb(r);
4259         dlm_put_lkb(lkb);
4260         return 0;
4261
4262  fail:
4263         setup_stub_lkb(ls, ms);
4264         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4265         return error;
4266 }
4267
4268 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4269 {
4270         struct dlm_lkb *lkb;
4271         struct dlm_rsb *r;
4272         int error;
4273
4274         error = find_lkb(ls, ms->m_remid, &lkb);
4275         if (error)
4276                 goto fail;
4277
4278         receive_flags(lkb, ms);
4279
4280         r = lkb->lkb_resource;
4281
4282         hold_rsb(r);
4283         lock_rsb(r);
4284
4285         error = validate_message(lkb, ms);
4286         if (error)
4287                 goto out;
4288
4289         error = do_cancel(r, lkb);
4290         send_cancel_reply(r, lkb, error);
4291         do_cancel_effects(r, lkb, error);
4292  out:
4293         unlock_rsb(r);
4294         put_rsb(r);
4295         dlm_put_lkb(lkb);
4296         return 0;
4297
4298  fail:
4299         setup_stub_lkb(ls, ms);
4300         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4301         return error;
4302 }
4303
4304 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4305 {
4306         struct dlm_lkb *lkb;
4307         struct dlm_rsb *r;
4308         int error;
4309
4310         error = find_lkb(ls, ms->m_remid, &lkb);
4311         if (error)
4312                 return error;
4313
4314         r = lkb->lkb_resource;
4315
4316         hold_rsb(r);
4317         lock_rsb(r);
4318
4319         error = validate_message(lkb, ms);
4320         if (error)
4321                 goto out;
4322
4323         receive_flags_reply(lkb, ms);
4324         if (is_altmode(lkb))
4325                 munge_altmode(lkb, ms);
4326         grant_lock_pc(r, lkb, ms);
4327         queue_cast(r, lkb, 0);
4328  out:
4329         unlock_rsb(r);
4330         put_rsb(r);
4331         dlm_put_lkb(lkb);
4332         return 0;
4333 }
4334
4335 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4336 {
4337         struct dlm_lkb *lkb;
4338         struct dlm_rsb *r;
4339         int error;
4340
4341         error = find_lkb(ls, ms->m_remid, &lkb);
4342         if (error)
4343                 return error;
4344
4345         r = lkb->lkb_resource;
4346
4347         hold_rsb(r);
4348         lock_rsb(r);
4349
4350         error = validate_message(lkb, ms);
4351         if (error)
4352                 goto out;
4353
4354         queue_bast(r, lkb, ms->m_bastmode);
4355         lkb->lkb_highbast = ms->m_bastmode;
4356  out:
4357         unlock_rsb(r);
4358         put_rsb(r);
4359         dlm_put_lkb(lkb);
4360         return 0;
4361 }
4362
4363 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4364 {
4365         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4366
4367         from_nodeid = ms->m_header.h_nodeid;
4368         our_nodeid = dlm_our_nodeid();
4369
4370         len = receive_extralen(ms);
4371
4372         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4373                                   &ret_nodeid, NULL);
4374
4375         /* Optimization: we're master so treat lookup as a request */
4376         if (!error && ret_nodeid == our_nodeid) {
4377                 receive_request(ls, ms);
4378                 return;
4379         }
4380         send_lookup_reply(ls, ms, ret_nodeid, error);
4381 }
4382
4383 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4384 {
4385         char name[DLM_RESNAME_MAXLEN+1];
4386         struct dlm_rsb *r;
4387         uint32_t hash, b;
4388         int rv, len, dir_nodeid, from_nodeid;
4389
4390         from_nodeid = ms->m_header.h_nodeid;
4391
4392         len = receive_extralen(ms);
4393
4394         if (len > DLM_RESNAME_MAXLEN) {
4395                 log_error(ls, "receive_remove from %d bad len %d",
4396                           from_nodeid, len);
4397                 return;
4398         }
4399
4400         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
4401         if (dir_nodeid != dlm_our_nodeid()) {
4402                 log_error(ls, "receive_remove from %d bad nodeid %d",
4403                           from_nodeid, dir_nodeid);
4404                 return;
4405         }
4406
4407         /* Look for name on rsbtbl.toss, if it's there, kill it.
4408            If it's on rsbtbl.keep, it's being used, and we should ignore this
4409            message.  This is an expected race between the dir node sending a
4410            request to the master node at the same time as the master node sends
4411            a remove to the dir node.  The resolution to that race is for the
4412            dir node to ignore the remove message, and the master node to
4413            recreate the master rsb when it gets a request from the dir node for
4414            an rsb it doesn't have. */
4415
4416         memset(name, 0, sizeof(name));
4417         memcpy(name, ms->m_extra, len);
4418
4419         hash = jhash(name, len, 0);
4420         b = hash & (ls->ls_rsbtbl_size - 1);
4421
4422         spin_lock(&ls->ls_rsbtbl[b].lock);
4423
4424         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4425         if (rv) {
4426                 /* verify the rsb is on keep list per comment above */
4427                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4428                 if (rv) {
4429                         /* should not happen */
4430                         log_error(ls, "receive_remove from %d not found %s",
4431                                   from_nodeid, name);
4432                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4433                         return;
4434                 }
4435                 if (r->res_master_nodeid != from_nodeid) {
4436                         /* should not happen */
4437                         log_error(ls, "receive_remove keep from %d master %d",
4438                                   from_nodeid, r->res_master_nodeid);
4439                         dlm_print_rsb(r);
4440                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4441                         return;
4442                 }
4443
4444                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4445                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4446                           name);
4447                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4448                 return;
4449         }
4450
4451         if (r->res_master_nodeid != from_nodeid) {
4452                 log_error(ls, "receive_remove toss from %d master %d",
4453                           from_nodeid, r->res_master_nodeid);
4454                 dlm_print_rsb(r);
4455                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4456                 return;
4457         }
4458
4459         if (kref_put(&r->res_ref, kill_rsb)) {
4460                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4461                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4462                 dlm_free_rsb(r);
4463         } else {
4464                 log_error(ls, "receive_remove from %d rsb ref error",
4465                           from_nodeid);
4466                 dlm_print_rsb(r);
4467                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4468         }
4469 }
4470
4471 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4472 {
4473         do_purge(ls, ms->m_nodeid, ms->m_pid);
4474 }
4475
4476 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4477 {
4478         struct dlm_lkb *lkb;
4479         struct dlm_rsb *r;
4480         int error, mstype, result;
4481         int from_nodeid = ms->m_header.h_nodeid;
4482
4483         error = find_lkb(ls, ms->m_remid, &lkb);
4484         if (error)
4485                 return error;
4486
4487         r = lkb->lkb_resource;
4488         hold_rsb(r);
4489         lock_rsb(r);
4490
4491         error = validate_message(lkb, ms);
4492         if (error)
4493                 goto out;
4494
4495         mstype = lkb->lkb_wait_type;
4496         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4497         if (error) {
4498                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4499                           lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
4500                 dlm_dump_rsb(r);
4501                 goto out;
4502         }
4503
4504         /* Optimization: the dir node was also the master, so it took our
4505            lookup as a request and sent request reply instead of lookup reply */
4506         if (mstype == DLM_MSG_LOOKUP) {
4507                 r->res_master_nodeid = from_nodeid;
4508                 r->res_nodeid = from_nodeid;
4509                 lkb->lkb_nodeid = from_nodeid;
4510         }
4511
4512         /* this is the value returned from do_request() on the master */
4513         result = ms->m_result;
4514
4515         switch (result) {
4516         case -EAGAIN:
4517                 /* request would block (be queued) on remote master */
4518                 queue_cast(r, lkb, -EAGAIN);
4519                 confirm_master(r, -EAGAIN);
4520                 unhold_lkb(lkb); /* undoes create_lkb() */
4521                 break;
4522
4523         case -EINPROGRESS:
4524         case 0:
4525                 /* request was queued or granted on remote master */
4526                 receive_flags_reply(lkb, ms);
4527                 lkb->lkb_remid = ms->m_lkid;
4528                 if (is_altmode(lkb))
4529                         munge_altmode(lkb, ms);
4530                 if (result) {
4531                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4532                         add_timeout(lkb);
4533                 } else {
4534                         grant_lock_pc(r, lkb, ms);
4535                         queue_cast(r, lkb, 0);
4536                 }
4537                 confirm_master(r, result);
4538                 break;
4539
4540         case -EBADR:
4541         case -ENOTBLK:
4542                 /* find_rsb failed to find rsb or rsb wasn't master */
4543                 log_limit(ls, "receive_request_reply %x from %d %d "
4544                           "master %d dir %d first %x %s", lkb->lkb_id,
4545                           from_nodeid, result, r->res_master_nodeid,
4546                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4547
4548                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4549                     r->res_master_nodeid != dlm_our_nodeid()) {
4550                         /* cause _request_lock->set_master->send_lookup */
4551                         r->res_master_nodeid = 0;
4552                         r->res_nodeid = -1;
4553                         lkb->lkb_nodeid = -1;
4554                 }
4555
4556                 if (is_overlap(lkb)) {
4557                         /* we'll ignore error in cancel/unlock reply */
4558                         queue_cast_overlap(r, lkb);
4559                         confirm_master(r, result);
4560                         unhold_lkb(lkb); /* undoes create_lkb() */
4561                 } else {
4562                         _request_lock(r, lkb);
4563
4564                         if (r->res_master_nodeid == dlm_our_nodeid())
4565                                 confirm_master(r, 0);
4566                 }
4567                 break;
4568
4569         default:
4570                 log_error(ls, "receive_request_reply %x error %d",
4571                           lkb->lkb_id, result);
4572         }
4573
4574         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4575                 log_debug(ls, "receive_request_reply %x result %d unlock",
4576                           lkb->lkb_id, result);
4577                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4578                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4579                 send_unlock(r, lkb);
4580         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4581                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4582                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4583                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4584                 send_cancel(r, lkb);
4585         } else {
4586                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4587                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4588         }
4589  out:
4590         unlock_rsb(r);
4591         put_rsb(r);
4592         dlm_put_lkb(lkb);
4593         return 0;
4594 }
4595
4596 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4597                                     struct dlm_message *ms)
4598 {
4599         /* this is the value returned from do_convert() on the master */
4600         switch (ms->m_result) {
4601         case -EAGAIN:
4602                 /* convert would block (be queued) on remote master */
4603                 queue_cast(r, lkb, -EAGAIN);
4604                 break;
4605
4606         case -EDEADLK:
4607                 receive_flags_reply(lkb, ms);
4608                 revert_lock_pc(r, lkb);
4609                 queue_cast(r, lkb, -EDEADLK);
4610                 break;
4611
4612         case -EINPROGRESS:
4613                 /* convert was queued on remote master */
4614                 receive_flags_reply(lkb, ms);
4615                 if (is_demoted(lkb))
4616                         munge_demoted(lkb);
4617                 del_lkb(r, lkb);
4618                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4619                 add_timeout(lkb);
4620                 break;
4621
4622         case 0:
4623                 /* convert was granted on remote master */
4624                 receive_flags_reply(lkb, ms);
4625                 if (is_demoted(lkb))
4626                         munge_demoted(lkb);
4627                 grant_lock_pc(r, lkb, ms);
4628                 queue_cast(r, lkb, 0);
4629                 break;
4630
4631         default:
4632                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4633                           lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
4634                           ms->m_result);
4635                 dlm_print_rsb(r);
4636                 dlm_print_lkb(lkb);
4637         }
4638 }
4639
4640 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4641 {
4642         struct dlm_rsb *r = lkb->lkb_resource;
4643         int error;
4644
4645         hold_rsb(r);
4646         lock_rsb(r);
4647
4648         error = validate_message(lkb, ms);
4649         if (error)
4650                 goto out;
4651
4652         /* stub reply can happen with waiters_mutex held */
4653         error = remove_from_waiters_ms(lkb, ms);
4654         if (error)
4655                 goto out;
4656
4657         __receive_convert_reply(r, lkb, ms);
4658  out:
4659         unlock_rsb(r);
4660         put_rsb(r);
4661 }
4662
4663 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4664 {
4665         struct dlm_lkb *lkb;
4666         int error;
4667
4668         error = find_lkb(ls, ms->m_remid, &lkb);
4669         if (error)
4670                 return error;
4671
4672         _receive_convert_reply(lkb, ms);
4673         dlm_put_lkb(lkb);
4674         return 0;
4675 }
4676
4677 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4678 {
4679         struct dlm_rsb *r = lkb->lkb_resource;
4680         int error;
4681
4682         hold_rsb(r);
4683         lock_rsb(r);
4684
4685         error = validate_message(lkb, ms);
4686         if (error)
4687                 goto out;
4688
4689         /* stub reply can happen with waiters_mutex held */
4690         error = remove_from_waiters_ms(lkb, ms);
4691         if (error)
4692                 goto out;
4693
4694         /* this is the value returned from do_unlock() on the master */
4695
4696         switch (ms->m_result) {
4697         case -DLM_EUNLOCK:
4698                 receive_flags_reply(lkb, ms);
4699                 remove_lock_pc(r, lkb);
4700                 queue_cast(r, lkb, -DLM_EUNLOCK);
4701                 break;
4702         case -ENOENT:
4703                 break;
4704         default:
4705                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4706                           lkb->lkb_id, ms->m_result);
4707         }
4708  out:
4709         unlock_rsb(r);
4710         put_rsb(r);
4711 }
4712
4713 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4714 {
4715         struct dlm_lkb *lkb;
4716         int error;
4717
4718         error = find_lkb(ls, ms->m_remid, &lkb);
4719         if (error)
4720                 return error;
4721
4722         _receive_unlock_reply(lkb, ms);
4723         dlm_put_lkb(lkb);
4724         return 0;
4725 }
4726
4727 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4728 {
4729         struct dlm_rsb *r = lkb->lkb_resource;
4730         int error;
4731
4732         hold_rsb(r);
4733         lock_rsb(r);
4734
4735         error = validate_message(lkb, ms);
4736         if (error)
4737                 goto out;
4738
4739         /* stub reply can happen with waiters_mutex held */
4740         error = remove_from_waiters_ms(lkb, ms);
4741         if (error)
4742                 goto out;
4743
4744         /* this is the value returned from do_cancel() on the master */
4745
4746         switch (ms->m_result) {
4747         case -DLM_ECANCEL:
4748                 receive_flags_reply(lkb, ms);
4749                 revert_lock_pc(r, lkb);
4750                 queue_cast(r, lkb, -DLM_ECANCEL);
4751                 break;
4752         case 0:
4753                 break;
4754         default:
4755                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4756                           lkb->lkb_id, ms->m_result);
4757         }
4758  out:
4759         unlock_rsb(r);
4760         put_rsb(r);
4761 }
4762
4763 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4764 {
4765         struct dlm_lkb *lkb;
4766         int error;
4767
4768         error = find_lkb(ls, ms->m_remid, &lkb);
4769         if (error)
4770                 return error;
4771
4772         _receive_cancel_reply(lkb, ms);
4773         dlm_put_lkb(lkb);
4774         return 0;
4775 }
4776
4777 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4778 {
4779         struct dlm_lkb *lkb;
4780         struct dlm_rsb *r;
4781         int error, ret_nodeid;
4782         int do_lookup_list = 0;
4783
4784         error = find_lkb(ls, ms->m_lkid, &lkb);
4785         if (error) {
4786                 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4787                 return;
4788         }
4789
4790         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4791            FIXME: will a non-zero error ever be returned? */
4792
4793         r = lkb->lkb_resource;
4794         hold_rsb(r);
4795         lock_rsb(r);
4796
4797         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4798         if (error)
4799                 goto out;
4800
4801         ret_nodeid = ms->m_nodeid;
4802
4803         /* We sometimes receive a request from the dir node for this
4804            rsb before we've received the dir node's loookup_reply for it.
4805            The request from the dir node implies we're the master, so we set
4806            ourself as master in receive_request_reply, and verify here that
4807            we are indeed the master. */
4808
4809         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4810                 /* This should never happen */
4811                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4812                           "master %d dir %d our %d first %x %s",
4813                           lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4814                           r->res_master_nodeid, r->res_dir_nodeid,
4815                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4816         }
4817
4818         if (ret_nodeid == dlm_our_nodeid()) {
4819                 r->res_master_nodeid = ret_nodeid;
4820                 r->res_nodeid = 0;
4821                 do_lookup_list = 1;
4822                 r->res_first_lkid = 0;
4823         } else if (ret_nodeid == -1) {
4824                 /* the remote node doesn't believe it's the dir node */
4825                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4826                           lkb->lkb_id, ms->m_header.h_nodeid);
4827                 r->res_master_nodeid = 0;
4828                 r->res_nodeid = -1;
4829                 lkb->lkb_nodeid = -1;
4830         } else {
4831                 /* set_master() will set lkb_nodeid from r */
4832                 r->res_master_nodeid = ret_nodeid;
4833                 r->res_nodeid = ret_nodeid;
4834         }
4835
4836         if (is_overlap(lkb)) {
4837                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4838                           lkb->lkb_id, lkb->lkb_flags);
4839                 queue_cast_overlap(r, lkb);
4840                 unhold_lkb(lkb); /* undoes create_lkb() */
4841                 goto out_list;
4842         }
4843
4844         _request_lock(r, lkb);
4845
4846  out_list:
4847         if (do_lookup_list)
4848                 process_lookup_list(r);
4849  out:
4850         unlock_rsb(r);
4851         put_rsb(r);
4852         dlm_put_lkb(lkb);
4853 }
4854
4855 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4856                              uint32_t saved_seq)
4857 {
4858         int error = 0, noent = 0;
4859
4860         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4861                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4862                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4863                           ms->m_remid, ms->m_result);
4864                 return;
4865         }
4866
4867         switch (ms->m_type) {
4868
4869         /* messages sent to a master node */
4870
4871         case DLM_MSG_REQUEST:
4872                 error = receive_request(ls, ms);
4873                 break;
4874
4875         case DLM_MSG_CONVERT:
4876                 error = receive_convert(ls, ms);
4877                 break;
4878
4879         case DLM_MSG_UNLOCK:
4880                 error = receive_unlock(ls, ms);
4881                 break;
4882
4883         case DLM_MSG_CANCEL:
4884                 noent = 1;
4885                 error = receive_cancel(ls, ms);
4886                 break;
4887
4888         /* messages sent from a master node (replies to above) */
4889
4890         case DLM_MSG_REQUEST_REPLY:
4891                 error = receive_request_reply(ls, ms);
4892                 break;
4893
4894         case DLM_MSG_CONVERT_REPLY:
4895                 error = receive_convert_reply(ls, ms);
4896                 break;
4897
4898         case DLM_MSG_UNLOCK_REPLY:
4899                 error = receive_unlock_reply(ls, ms);
4900                 break;
4901
4902         case DLM_MSG_CANCEL_REPLY:
4903                 error = receive_cancel_reply(ls, ms);
4904                 break;
4905
4906         /* messages sent from a master node (only two types of async msg) */
4907
4908         case DLM_MSG_GRANT:
4909                 noent = 1;
4910                 error = receive_grant(ls, ms);
4911                 break;
4912
4913         case DLM_MSG_BAST:
4914                 noent = 1;
4915                 error = receive_bast(ls, ms);
4916                 break;
4917
4918         /* messages sent to a dir node */
4919
4920         case DLM_MSG_LOOKUP:
4921                 receive_lookup(ls, ms);
4922                 break;
4923
4924         case DLM_MSG_REMOVE:
4925                 receive_remove(ls, ms);
4926                 break;
4927
4928         /* messages sent from a dir node (remove has no reply) */
4929
4930         case DLM_MSG_LOOKUP_REPLY:
4931                 receive_lookup_reply(ls, ms);
4932                 break;
4933
4934         /* other messages */
4935
4936         case DLM_MSG_PURGE:
4937                 receive_purge(ls, ms);
4938                 break;
4939
4940         default:
4941                 log_error(ls, "unknown message type %d", ms->m_type);
4942         }
4943
4944         /*
4945          * When checking for ENOENT, we're checking the result of
4946          * find_lkb(m_remid):
4947          *
4948          * The lock id referenced in the message wasn't found.  This may
4949          * happen in normal usage for the async messages and cancel, so
4950          * only use log_debug for them.
4951          *
4952          * Some errors are expected and normal.
4953          */
4954
4955         if (error == -ENOENT && noent) {
4956                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4957                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4958                           ms->m_lkid, saved_seq);
4959         } else if (error == -ENOENT) {
4960                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4961                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4962                           ms->m_lkid, saved_seq);
4963
4964                 if (ms->m_type == DLM_MSG_CONVERT)
4965                         dlm_dump_rsb_hash(ls, ms->m_hash);
4966         }
4967
4968         if (error == -EINVAL) {
4969                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4970                           "saved_seq %u",
4971                           ms->m_type, ms->m_header.h_nodeid,
4972                           ms->m_lkid, ms->m_remid, saved_seq);
4973         }
4974 }
4975
4976 /* If the lockspace is in recovery mode (locking stopped), then normal
4977    messages are saved on the requestqueue for processing after recovery is
4978    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4979    messages off the requestqueue before we process new ones. This occurs right
4980    after recovery completes when we transition from saving all messages on
4981    requestqueue, to processing all the saved messages, to processing new
4982    messages as they arrive. */
4983
4984 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4985                                 int nodeid)
4986 {
4987         if (dlm_locking_stopped(ls)) {
4988                 /* If we were a member of this lockspace, left, and rejoined,
4989                    other nodes may still be sending us messages from the
4990                    lockspace generation before we left. */
4991                 if (!ls->ls_generation) {
4992                         log_limit(ls, "receive %d from %d ignore old gen",
4993                                   ms->m_type, nodeid);
4994                         return;
4995                 }
4996
4997                 dlm_add_requestqueue(ls, nodeid, ms);
4998         } else {
4999                 dlm_wait_requestqueue(ls);
5000                 _receive_message(ls, ms, 0);
5001         }
5002 }
5003
5004 /* This is called by dlm_recoverd to process messages that were saved on
5005    the requestqueue. */
5006
5007 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
5008                                uint32_t saved_seq)
5009 {
5010         _receive_message(ls, ms, saved_seq);
5011 }
5012
5013 /* This is called by the midcomms layer when something is received for
5014    the lockspace.  It could be either a MSG (normal message sent as part of
5015    standard locking activity) or an RCOM (recovery message sent as part of
5016    lockspace recovery). */
5017
5018 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
5019 {
5020         struct dlm_header *hd = &p->header;
5021         struct dlm_ls *ls;
5022         int type = 0;
5023
5024         switch (hd->h_cmd) {
5025         case DLM_MSG:
5026                 dlm_message_in(&p->message);
5027                 type = p->message.m_type;
5028                 break;
5029         case DLM_RCOM:
5030                 dlm_rcom_in(&p->rcom);
5031                 type = p->rcom.rc_type;
5032                 break;
5033         default:
5034                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
5035                 return;
5036         }
5037
5038         if (hd->h_nodeid != nodeid) {
5039                 log_print("invalid h_nodeid %d from %d lockspace %x",
5040                           hd->h_nodeid, nodeid, hd->h_lockspace);
5041                 return;
5042         }
5043
5044         ls = dlm_find_lockspace_global(hd->h_lockspace);
5045         if (!ls) {
5046                 if (dlm_config.ci_log_debug) {
5047                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5048                                 "%u from %d cmd %d type %d\n",
5049                                 hd->h_lockspace, nodeid, hd->h_cmd, type);
5050                 }
5051
5052                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
5053                         dlm_send_ls_not_ready(nodeid, &p->rcom);
5054                 return;
5055         }
5056
5057         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5058            be inactive (in this ls) before transitioning to recovery mode */
5059
5060         down_read(&ls->ls_recv_active);
5061         if (hd->h_cmd == DLM_MSG)
5062                 dlm_receive_message(ls, &p->message, nodeid);
5063         else
5064                 dlm_receive_rcom(ls, &p->rcom, nodeid);
5065         up_read(&ls->ls_recv_active);
5066
5067         dlm_put_lockspace(ls);
5068 }
5069
5070 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5071                                    struct dlm_message *ms_stub)
5072 {
5073         if (middle_conversion(lkb)) {
5074                 hold_lkb(lkb);
5075                 memset(ms_stub, 0, sizeof(struct dlm_message));
5076                 ms_stub->m_flags = DLM_IFL_STUB_MS;
5077                 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
5078                 ms_stub->m_result = -EINPROGRESS;
5079                 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5080                 _receive_convert_reply(lkb, ms_stub);
5081
5082                 /* Same special case as in receive_rcom_lock_args() */
5083                 lkb->lkb_grmode = DLM_LOCK_IV;
5084                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5085                 unhold_lkb(lkb);
5086
5087         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5088                 lkb->lkb_flags |= DLM_IFL_RESEND;
5089         }
5090
5091         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5092            conversions are async; there's no reply from the remote master */
5093 }
5094
5095 /* A waiting lkb needs recovery if the master node has failed, or
5096    the master node is changing (only when no directory is used) */
5097
5098 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5099                                  int dir_nodeid)
5100 {
5101         if (dlm_no_directory(ls))
5102                 return 1;
5103
5104         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5105                 return 1;
5106
5107         return 0;
5108 }
5109
5110 /* Recovery for locks that are waiting for replies from nodes that are now
5111    gone.  We can just complete unlocks and cancels by faking a reply from the
5112    dead node.  Requests and up-conversions we flag to be resent after
5113    recovery.  Down-conversions can just be completed with a fake reply like
5114    unlocks.  Conversions between PR and CW need special attention. */
5115
5116 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5117 {
5118         struct dlm_lkb *lkb, *safe;
5119         struct dlm_message *ms_stub;
5120         int wait_type, stub_unlock_result, stub_cancel_result;
5121         int dir_nodeid;
5122
5123         ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
5124         if (!ms_stub) {
5125                 log_error(ls, "dlm_recover_waiters_pre no mem");
5126                 return;
5127         }
5128
5129         mutex_lock(&ls->ls_waiters_mutex);
5130
5131         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5132
5133                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5134
5135                 /* exclude debug messages about unlocks because there can be so
5136                    many and they aren't very interesting */
5137
5138                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5139                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5140                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5141                                   lkb->lkb_id,
5142                                   lkb->lkb_remid,
5143                                   lkb->lkb_wait_type,
5144                                   lkb->lkb_resource->res_nodeid,
5145                                   lkb->lkb_nodeid,
5146                                   lkb->lkb_wait_nodeid,
5147                                   dir_nodeid);
5148                 }
5149
5150                 /* all outstanding lookups, regardless of destination  will be
5151                    resent after recovery is done */
5152
5153                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5154                         lkb->lkb_flags |= DLM_IFL_RESEND;
5155                         continue;
5156                 }
5157
5158                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5159                         continue;
5160
5161                 wait_type = lkb->lkb_wait_type;
5162                 stub_unlock_result = -DLM_EUNLOCK;
5163                 stub_cancel_result = -DLM_ECANCEL;
5164
5165                 /* Main reply may have been received leaving a zero wait_type,
5166                    but a reply for the overlapping op may not have been
5167                    received.  In that case we need to fake the appropriate
5168                    reply for the overlap op. */
5169
5170                 if (!wait_type) {
5171                         if (is_overlap_cancel(lkb)) {
5172                                 wait_type = DLM_MSG_CANCEL;
5173                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5174                                         stub_cancel_result = 0;
5175                         }
5176                         if (is_overlap_unlock(lkb)) {
5177                                 wait_type = DLM_MSG_UNLOCK;
5178                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5179                                         stub_unlock_result = -ENOENT;
5180                         }
5181
5182                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
5183                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
5184                                   stub_cancel_result, stub_unlock_result);
5185                 }
5186
5187                 switch (wait_type) {
5188
5189                 case DLM_MSG_REQUEST:
5190                         lkb->lkb_flags |= DLM_IFL_RESEND;
5191                         break;
5192
5193                 case DLM_MSG_CONVERT:
5194                         recover_convert_waiter(ls, lkb, ms_stub);
5195                         break;
5196
5197                 case DLM_MSG_UNLOCK:
5198                         hold_lkb(lkb);
5199                         memset(ms_stub, 0, sizeof(struct dlm_message));
5200                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5201                         ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
5202                         ms_stub->m_result = stub_unlock_result;
5203                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5204                         _receive_unlock_reply(lkb, ms_stub);
5205                         dlm_put_lkb(lkb);
5206                         break;
5207
5208                 case DLM_MSG_CANCEL:
5209                         hold_lkb(lkb);
5210                         memset(ms_stub, 0, sizeof(struct dlm_message));
5211                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5212                         ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
5213                         ms_stub->m_result = stub_cancel_result;
5214                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5215                         _receive_cancel_reply(lkb, ms_stub);
5216                         dlm_put_lkb(lkb);
5217                         break;
5218
5219                 default:
5220                         log_error(ls, "invalid lkb wait_type %d %d",
5221                                   lkb->lkb_wait_type, wait_type);
5222                 }
5223                 schedule();
5224         }
5225         mutex_unlock(&ls->ls_waiters_mutex);
5226         kfree(ms_stub);
5227 }
5228
5229 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5230 {
5231         struct dlm_lkb *lkb;
5232         int found = 0;
5233
5234         mutex_lock(&ls->ls_waiters_mutex);
5235         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
5236                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
5237                         hold_lkb(lkb);
5238                         found = 1;
5239                         break;
5240                 }
5241         }
5242         mutex_unlock(&ls->ls_waiters_mutex);
5243
5244         if (!found)
5245                 lkb = NULL;
5246         return lkb;
5247 }
5248
5249 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5250    master or dir-node for r.  Processing the lkb may result in it being placed
5251    back on waiters. */
5252
5253 /* We do this after normal locking has been enabled and any saved messages
5254    (in requestqueue) have been processed.  We should be confident that at
5255    this point we won't get or process a reply to any of these waiting
5256    operations.  But, new ops may be coming in on the rsbs/locks here from
5257    userspace or remotely. */
5258
5259 /* there may have been an overlap unlock/cancel prior to recovery or after
5260    recovery.  if before, the lkb may still have a pos wait_count; if after, the
5261    overlap flag would just have been set and nothing new sent.  we can be
5262    confident here than any replies to either the initial op or overlap ops
5263    prior to recovery have been received. */
5264
5265 int dlm_recover_waiters_post(struct dlm_ls *ls)
5266 {
5267         struct dlm_lkb *lkb;
5268         struct dlm_rsb *r;
5269         int error = 0, mstype, err, oc, ou;
5270
5271         while (1) {
5272                 if (dlm_locking_stopped(ls)) {
5273                         log_debug(ls, "recover_waiters_post aborted");
5274                         error = -EINTR;
5275                         break;
5276                 }
5277
5278                 lkb = find_resend_waiter(ls);
5279                 if (!lkb)
5280                         break;
5281
5282                 r = lkb->lkb_resource;
5283                 hold_rsb(r);
5284                 lock_rsb(r);
5285
5286                 mstype = lkb->lkb_wait_type;
5287                 oc = is_overlap_cancel(lkb);
5288                 ou = is_overlap_unlock(lkb);
5289                 err = 0;
5290
5291                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5292                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5293                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5294                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5295                           dlm_dir_nodeid(r), oc, ou);
5296
5297                 /* At this point we assume that we won't get a reply to any
5298                    previous op or overlap op on this lock.  First, do a big
5299                    remove_from_waiters() for all previous ops. */
5300
5301                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
5302                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5303                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5304                 lkb->lkb_wait_type = 0;
5305                 lkb->lkb_wait_count = 0;
5306                 mutex_lock(&ls->ls_waiters_mutex);
5307                 list_del_init(&lkb->lkb_wait_reply);
5308                 mutex_unlock(&ls->ls_waiters_mutex);
5309                 unhold_lkb(lkb); /* for waiters list */
5310
5311                 if (oc || ou) {
5312                         /* do an unlock or cancel instead of resending */
5313                         switch (mstype) {
5314                         case DLM_MSG_LOOKUP:
5315                         case DLM_MSG_REQUEST:
5316                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5317                                                         -DLM_ECANCEL);
5318                                 unhold_lkb(lkb); /* undoes create_lkb() */
5319                                 break;
5320                         case DLM_MSG_CONVERT:
5321                                 if (oc) {
5322                                         queue_cast(r, lkb, -DLM_ECANCEL);
5323                                 } else {
5324                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5325                                         _unlock_lock(r, lkb);
5326                                 }
5327                                 break;
5328                         default:
5329                                 err = 1;
5330                         }
5331                 } else {
5332                         switch (mstype) {
5333                         case DLM_MSG_LOOKUP:
5334                         case DLM_MSG_REQUEST:
5335                                 _request_lock(r, lkb);
5336                                 if (is_master(r))
5337                                         confirm_master(r, 0);
5338                                 break;
5339                         case DLM_MSG_CONVERT:
5340                                 _convert_lock(r, lkb);
5341                                 break;
5342                         default:
5343                                 err = 1;
5344                         }
5345                 }
5346
5347                 if (err) {
5348                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5349                                   "dir_nodeid %d overlap %d %d",
5350                                   lkb->lkb_id, mstype, r->res_nodeid,
5351                                   dlm_dir_nodeid(r), oc, ou);
5352                 }
5353                 unlock_rsb(r);
5354                 put_rsb(r);
5355                 dlm_put_lkb(lkb);
5356         }
5357
5358         return error;
5359 }
5360
5361 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5362                               struct list_head *list)
5363 {
5364         struct dlm_lkb *lkb, *safe;
5365
5366         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5367                 if (!is_master_copy(lkb))
5368                         continue;
5369
5370                 /* don't purge lkbs we've added in recover_master_copy for
5371                    the current recovery seq */
5372
5373                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5374                         continue;
5375
5376                 del_lkb(r, lkb);
5377
5378                 /* this put should free the lkb */
5379                 if (!dlm_put_lkb(lkb))
5380                         log_error(ls, "purged mstcpy lkb not released");
5381         }
5382 }
5383
5384 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5385 {
5386         struct dlm_ls *ls = r->res_ls;
5387
5388         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5389         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5390         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5391 }
5392
5393 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5394                             struct list_head *list,
5395                             int nodeid_gone, unsigned int *count)
5396 {
5397         struct dlm_lkb *lkb, *safe;
5398
5399         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5400                 if (!is_master_copy(lkb))
5401                         continue;
5402
5403                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5404                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5405
5406                         /* tell recover_lvb to invalidate the lvb
5407                            because a node holding EX/PW failed */
5408                         if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5409                             (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5410                                 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5411                         }
5412
5413                         del_lkb(r, lkb);
5414
5415                         /* this put should free the lkb */
5416                         if (!dlm_put_lkb(lkb))
5417                                 log_error(ls, "purged dead lkb not released");
5418
5419                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5420
5421                         (*count)++;
5422                 }
5423         }
5424 }
5425
5426 /* Get rid of locks held by nodes that are gone. */
5427
5428 void dlm_recover_purge(struct dlm_ls *ls)
5429 {
5430         struct dlm_rsb *r;
5431         struct dlm_member *memb;
5432         int nodes_count = 0;
5433         int nodeid_gone = 0;
5434         unsigned int lkb_count = 0;
5435
5436         /* cache one removed nodeid to optimize the common
5437            case of a single node removed */
5438
5439         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5440                 nodes_count++;
5441                 nodeid_gone = memb->nodeid;
5442         }
5443
5444         if (!nodes_count)
5445                 return;
5446
5447         down_write(&ls->ls_root_sem);
5448         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5449                 hold_rsb(r);
5450                 lock_rsb(r);
5451                 if (is_master(r)) {
5452                         purge_dead_list(ls, r, &r->res_grantqueue,
5453                                         nodeid_gone, &lkb_count);
5454                         purge_dead_list(ls, r, &r->res_convertqueue,
5455                                         nodeid_gone, &lkb_count);
5456                         purge_dead_list(ls, r, &r->res_waitqueue,
5457                                         nodeid_gone, &lkb_count);
5458                 }
5459                 unlock_rsb(r);
5460                 unhold_rsb(r);
5461                 cond_resched();
5462         }
5463         up_write(&ls->ls_root_sem);
5464
5465         if (lkb_count)
5466                 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5467                           lkb_count, nodes_count);
5468 }
5469
5470 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5471 {
5472         struct rb_node *n;
5473         struct dlm_rsb *r;
5474
5475         spin_lock(&ls->ls_rsbtbl[bucket].lock);
5476         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5477                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5478
5479                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5480                         continue;
5481                 if (!is_master(r)) {
5482                         rsb_clear_flag(r, RSB_RECOVER_GRANT);
5483                         continue;
5484                 }
5485                 hold_rsb(r);
5486                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5487                 return r;
5488         }
5489         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5490         return NULL;
5491 }
5492
5493 /*
5494  * Attempt to grant locks on resources that we are the master of.
5495  * Locks may have become grantable during recovery because locks
5496  * from departed nodes have been purged (or not rebuilt), allowing
5497  * previously blocked locks to now be granted.  The subset of rsb's
5498  * we are interested in are those with lkb's on either the convert or
5499  * waiting queues.
5500  *
5501  * Simplest would be to go through each master rsb and check for non-empty
5502  * convert or waiting queues, and attempt to grant on those rsbs.
5503  * Checking the queues requires lock_rsb, though, for which we'd need
5504  * to release the rsbtbl lock.  This would make iterating through all
5505  * rsb's very inefficient.  So, we rely on earlier recovery routines
5506  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5507  * locks for.
5508  */
5509
5510 void dlm_recover_grant(struct dlm_ls *ls)
5511 {
5512         struct dlm_rsb *r;
5513         int bucket = 0;
5514         unsigned int count = 0;
5515         unsigned int rsb_count = 0;
5516         unsigned int lkb_count = 0;
5517
5518         while (1) {
5519                 r = find_grant_rsb(ls, bucket);
5520                 if (!r) {
5521                         if (bucket == ls->ls_rsbtbl_size - 1)
5522                                 break;
5523                         bucket++;
5524                         continue;
5525                 }
5526                 rsb_count++;
5527                 count = 0;
5528                 lock_rsb(r);
5529                 /* the RECOVER_GRANT flag is checked in the grant path */
5530                 grant_pending_locks(r, &count);
5531                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5532                 lkb_count += count;
5533                 confirm_master(r, 0);
5534                 unlock_rsb(r);
5535                 put_rsb(r);
5536                 cond_resched();
5537         }
5538
5539         if (lkb_count)
5540                 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5541                           lkb_count, rsb_count);
5542 }
5543
5544 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5545                                          uint32_t remid)
5546 {
5547         struct dlm_lkb *lkb;
5548
5549         list_for_each_entry(lkb, head, lkb_statequeue) {
5550                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5551                         return lkb;
5552         }
5553         return NULL;
5554 }
5555
5556 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5557                                     uint32_t remid)
5558 {
5559         struct dlm_lkb *lkb;
5560
5561         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5562         if (lkb)
5563                 return lkb;
5564         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5565         if (lkb)
5566                 return lkb;
5567         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5568         if (lkb)
5569                 return lkb;
5570         return NULL;
5571 }
5572
5573 /* needs at least dlm_rcom + rcom_lock */
5574 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5575                                   struct dlm_rsb *r, struct dlm_rcom *rc)
5576 {
5577         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5578
5579         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
5580         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5581         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5582         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5583         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5584         lkb->lkb_flags |= DLM_IFL_MSTCPY;
5585         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5586         lkb->lkb_rqmode = rl->rl_rqmode;
5587         lkb->lkb_grmode = rl->rl_grmode;
5588         /* don't set lkb_status because add_lkb wants to itself */
5589
5590         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5591         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5592
5593         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5594                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
5595                          sizeof(struct rcom_lock);
5596                 if (lvblen > ls->ls_lvblen)
5597                         return -EINVAL;
5598                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5599                 if (!lkb->lkb_lvbptr)
5600                         return -ENOMEM;
5601                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5602         }
5603
5604         /* Conversions between PR and CW (middle modes) need special handling.
5605            The real granted mode of these converting locks cannot be determined
5606            until all locks have been rebuilt on the rsb (recover_conversion) */
5607
5608         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5609             middle_conversion(lkb)) {
5610                 rl->rl_status = DLM_LKSTS_CONVERT;
5611                 lkb->lkb_grmode = DLM_LOCK_IV;
5612                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5613         }
5614
5615         return 0;
5616 }
5617
5618 /* This lkb may have been recovered in a previous aborted recovery so we need
5619    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5620    If so we just send back a standard reply.  If not, we create a new lkb with
5621    the given values and send back our lkid.  We send back our lkid by sending
5622    back the rcom_lock struct we got but with the remid field filled in. */
5623
5624 /* needs at least dlm_rcom + rcom_lock */
5625 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5626 {
5627         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5628         struct dlm_rsb *r;
5629         struct dlm_lkb *lkb;
5630         uint32_t remid = 0;
5631         int from_nodeid = rc->rc_header.h_nodeid;
5632         int error;
5633
5634         if (rl->rl_parent_lkid) {
5635                 error = -EOPNOTSUPP;
5636                 goto out;
5637         }
5638
5639         remid = le32_to_cpu(rl->rl_lkid);
5640
5641         /* In general we expect the rsb returned to be R_MASTER, but we don't
5642            have to require it.  Recovery of masters on one node can overlap
5643            recovery of locks on another node, so one node can send us MSTCPY
5644            locks before we've made ourselves master of this rsb.  We can still
5645            add new MSTCPY locks that we receive here without any harm; when
5646            we make ourselves master, dlm_recover_masters() won't touch the
5647            MSTCPY locks we've received early. */
5648
5649         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5650                          from_nodeid, R_RECEIVE_RECOVER, &r);
5651         if (error)
5652                 goto out;
5653
5654         lock_rsb(r);
5655
5656         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5657                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5658                           from_nodeid, remid);
5659                 error = -EBADR;
5660                 goto out_unlock;
5661         }
5662
5663         lkb = search_remid(r, from_nodeid, remid);
5664         if (lkb) {
5665                 error = -EEXIST;
5666                 goto out_remid;
5667         }
5668
5669         error = create_lkb(ls, &lkb);
5670         if (error)
5671                 goto out_unlock;
5672
5673         error = receive_rcom_lock_args(ls, lkb, r, rc);
5674         if (error) {
5675                 __put_lkb(ls, lkb);
5676                 goto out_unlock;
5677         }
5678
5679         attach_lkb(r, lkb);
5680         add_lkb(r, lkb, rl->rl_status);
5681         error = 0;
5682         ls->ls_recover_locks_in++;
5683
5684         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5685                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5686
5687  out_remid:
5688         /* this is the new value returned to the lock holder for
5689            saving in its process-copy lkb */
5690         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5691
5692         lkb->lkb_recover_seq = ls->ls_recover_seq;
5693
5694  out_unlock:
5695         unlock_rsb(r);
5696         put_rsb(r);
5697  out:
5698         if (error && error != -EEXIST)
5699                 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5700                           from_nodeid, remid, error);
5701         rl->rl_result = cpu_to_le32(error);
5702         return error;
5703 }
5704
5705 /* needs at least dlm_rcom + rcom_lock */
5706 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5707 {
5708         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5709         struct dlm_rsb *r;
5710         struct dlm_lkb *lkb;
5711         uint32_t lkid, remid;
5712         int error, result;
5713
5714         lkid = le32_to_cpu(rl->rl_lkid);
5715         remid = le32_to_cpu(rl->rl_remid);
5716         result = le32_to_cpu(rl->rl_result);
5717
5718         error = find_lkb(ls, lkid, &lkb);
5719         if (error) {
5720                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5721                           lkid, rc->rc_header.h_nodeid, remid, result);
5722                 return error;
5723         }
5724
5725         r = lkb->lkb_resource;
5726         hold_rsb(r);
5727         lock_rsb(r);
5728
5729         if (!is_process_copy(lkb)) {
5730                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5731                           lkid, rc->rc_header.h_nodeid, remid, result);
5732                 dlm_dump_rsb(r);
5733                 unlock_rsb(r);
5734                 put_rsb(r);
5735                 dlm_put_lkb(lkb);
5736                 return -EINVAL;
5737         }
5738
5739         switch (result) {
5740         case -EBADR:
5741                 /* There's a chance the new master received our lock before
5742                    dlm_recover_master_reply(), this wouldn't happen if we did
5743                    a barrier between recover_masters and recover_locks. */
5744
5745                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5746                           lkid, rc->rc_header.h_nodeid, remid, result);
5747         
5748                 dlm_send_rcom_lock(r, lkb);
5749                 goto out;
5750         case -EEXIST:
5751         case 0:
5752                 lkb->lkb_remid = remid;
5753                 break;
5754         default:
5755                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5756                           lkid, rc->rc_header.h_nodeid, remid, result);
5757         }
5758
5759         /* an ack for dlm_recover_locks() which waits for replies from
5760            all the locks it sends to new masters */
5761         dlm_recovered_lock(r);
5762  out:
5763         unlock_rsb(r);
5764         put_rsb(r);
5765         dlm_put_lkb(lkb);
5766
5767         return 0;
5768 }
5769
5770 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5771                      int mode, uint32_t flags, void *name, unsigned int namelen,
5772                      unsigned long timeout_cs)
5773 {
5774         struct dlm_lkb *lkb;
5775         struct dlm_args args;
5776         int error;
5777
5778         dlm_lock_recovery(ls);
5779
5780         error = create_lkb(ls, &lkb);
5781         if (error) {
5782                 kfree(ua);
5783                 goto out;
5784         }
5785
5786         if (flags & DLM_LKF_VALBLK) {
5787                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5788                 if (!ua->lksb.sb_lvbptr) {
5789                         kfree(ua);
5790                         __put_lkb(ls, lkb);
5791                         error = -ENOMEM;
5792                         goto out;
5793                 }
5794         }
5795
5796         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5797            When DLM_IFL_USER is set, the dlm knows that this is a userspace
5798            lock and that lkb_astparam is the dlm_user_args structure. */
5799
5800         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5801                               fake_astfn, ua, fake_bastfn, &args);
5802         lkb->lkb_flags |= DLM_IFL_USER;
5803
5804         if (error) {
5805                 __put_lkb(ls, lkb);
5806                 goto out;
5807         }
5808
5809         error = request_lock(ls, lkb, name, namelen, &args);
5810
5811         switch (error) {
5812         case 0:
5813                 break;
5814         case -EINPROGRESS:
5815                 error = 0;
5816                 break;
5817         case -EAGAIN:
5818                 error = 0;
5819                 /* fall through */
5820         default:
5821                 __put_lkb(ls, lkb);
5822                 goto out;
5823         }
5824
5825         /* add this new lkb to the per-process list of locks */
5826         spin_lock(&ua->proc->locks_spin);
5827         hold_lkb(lkb);
5828         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5829         spin_unlock(&ua->proc->locks_spin);
5830  out:
5831         dlm_unlock_recovery(ls);
5832         return error;
5833 }
5834
5835 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5836                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5837                      unsigned long timeout_cs)
5838 {
5839         struct dlm_lkb *lkb;
5840         struct dlm_args args;
5841         struct dlm_user_args *ua;
5842         int error;
5843
5844         dlm_lock_recovery(ls);
5845
5846         error = find_lkb(ls, lkid, &lkb);
5847         if (error)
5848                 goto out;
5849
5850         /* user can change the params on its lock when it converts it, or
5851            add an lvb that didn't exist before */
5852
5853         ua = lkb->lkb_ua;
5854
5855         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5856                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5857                 if (!ua->lksb.sb_lvbptr) {
5858                         error = -ENOMEM;
5859                         goto out_put;
5860                 }
5861         }
5862         if (lvb_in && ua->lksb.sb_lvbptr)
5863                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5864
5865         ua->xid = ua_tmp->xid;
5866         ua->castparam = ua_tmp->castparam;
5867         ua->castaddr = ua_tmp->castaddr;
5868         ua->bastparam = ua_tmp->bastparam;
5869         ua->bastaddr = ua_tmp->bastaddr;
5870         ua->user_lksb = ua_tmp->user_lksb;
5871
5872         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5873                               fake_astfn, ua, fake_bastfn, &args);
5874         if (error)
5875                 goto out_put;
5876
5877         error = convert_lock(ls, lkb, &args);
5878
5879         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5880                 error = 0;
5881  out_put:
5882         dlm_put_lkb(lkb);
5883  out:
5884         dlm_unlock_recovery(ls);
5885         kfree(ua_tmp);
5886         return error;
5887 }
5888
5889 /*
5890  * The caller asks for an orphan lock on a given resource with a given mode.
5891  * If a matching lock exists, it's moved to the owner's list of locks and
5892  * the lkid is returned.
5893  */
5894
5895 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5896                      int mode, uint32_t flags, void *name, unsigned int namelen,
5897                      unsigned long timeout_cs, uint32_t *lkid)
5898 {
5899         struct dlm_lkb *lkb;
5900         struct dlm_user_args *ua;
5901         int found_other_mode = 0;
5902         int found = 0;
5903         int rv = 0;
5904
5905         mutex_lock(&ls->ls_orphans_mutex);
5906         list_for_each_entry(lkb, &ls->ls_orphans, lkb_ownqueue) {
5907                 if (lkb->lkb_resource->res_length != namelen)
5908                         continue;
5909                 if (memcmp(lkb->lkb_resource->res_name, name, namelen))
5910                         continue;
5911                 if (lkb->lkb_grmode != mode) {
5912                         found_other_mode = 1;
5913                         continue;
5914                 }
5915
5916                 found = 1;
5917                 list_del_init(&lkb->lkb_ownqueue);
5918                 lkb->lkb_flags &= ~DLM_IFL_ORPHAN;
5919                 *lkid = lkb->lkb_id;
5920                 break;
5921         }
5922         mutex_unlock(&ls->ls_orphans_mutex);
5923
5924         if (!found && found_other_mode) {
5925                 rv = -EAGAIN;
5926                 goto out;
5927         }
5928
5929         if (!found) {
5930                 rv = -ENOENT;
5931                 goto out;
5932         }
5933
5934         lkb->lkb_exflags = flags;
5935         lkb->lkb_ownpid = (int) current->pid;
5936
5937         ua = lkb->lkb_ua;
5938
5939         ua->proc = ua_tmp->proc;
5940         ua->xid = ua_tmp->xid;
5941         ua->castparam = ua_tmp->castparam;
5942         ua->castaddr = ua_tmp->castaddr;
5943         ua->bastparam = ua_tmp->bastparam;
5944         ua->bastaddr = ua_tmp->bastaddr;
5945         ua->user_lksb = ua_tmp->user_lksb;
5946
5947         /*
5948          * The lkb reference from the ls_orphans list was not
5949          * removed above, and is now considered the reference
5950          * for the proc locks list.
5951          */
5952
5953         spin_lock(&ua->proc->locks_spin);
5954         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5955         spin_unlock(&ua->proc->locks_spin);
5956  out:
5957         kfree(ua_tmp);
5958         return rv;
5959 }
5960
5961 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5962                     uint32_t flags, uint32_t lkid, char *lvb_in)
5963 {
5964         struct dlm_lkb *lkb;
5965         struct dlm_args args;
5966         struct dlm_user_args *ua;
5967         int error;
5968
5969         dlm_lock_recovery(ls);
5970
5971         error = find_lkb(ls, lkid, &lkb);
5972         if (error)
5973                 goto out;
5974
5975         ua = lkb->lkb_ua;
5976
5977         if (lvb_in && ua->lksb.sb_lvbptr)
5978                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5979         if (ua_tmp->castparam)
5980                 ua->castparam = ua_tmp->castparam;
5981         ua->user_lksb = ua_tmp->user_lksb;
5982
5983         error = set_unlock_args(flags, ua, &args);
5984         if (error)
5985                 goto out_put;
5986
5987         error = unlock_lock(ls, lkb, &args);
5988
5989         if (error == -DLM_EUNLOCK)
5990                 error = 0;
5991         /* from validate_unlock_args() */
5992         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5993                 error = 0;
5994         if (error)
5995                 goto out_put;
5996
5997         spin_lock(&ua->proc->locks_spin);
5998         /* dlm_user_add_cb() may have already taken lkb off the proc list */
5999         if (!list_empty(&lkb->lkb_ownqueue))
6000                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
6001         spin_unlock(&ua->proc->locks_spin);
6002  out_put:
6003         dlm_put_lkb(lkb);
6004  out:
6005         dlm_unlock_recovery(ls);
6006         kfree(ua_tmp);
6007         return error;
6008 }
6009
6010 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6011                     uint32_t flags, uint32_t lkid)
6012 {
6013         struct dlm_lkb *lkb;
6014         struct dlm_args args;
6015         struct dlm_user_args *ua;
6016         int error;
6017
6018         dlm_lock_recovery(ls);
6019
6020         error = find_lkb(ls, lkid, &lkb);
6021         if (error)
6022                 goto out;
6023
6024         ua = lkb->lkb_ua;
6025         if (ua_tmp->castparam)
6026                 ua->castparam = ua_tmp->castparam;
6027         ua->user_lksb = ua_tmp->user_lksb;
6028
6029         error = set_unlock_args(flags, ua, &args);
6030         if (error)
6031                 goto out_put;
6032
6033         error = cancel_lock(ls, lkb, &args);
6034
6035         if (error == -DLM_ECANCEL)
6036                 error = 0;
6037         /* from validate_unlock_args() */
6038         if (error == -EBUSY)
6039                 error = 0;
6040  out_put:
6041         dlm_put_lkb(lkb);
6042  out:
6043         dlm_unlock_recovery(ls);
6044         kfree(ua_tmp);
6045         return error;
6046 }
6047
6048 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6049 {
6050         struct dlm_lkb *lkb;
6051         struct dlm_args args;
6052         struct dlm_user_args *ua;
6053         struct dlm_rsb *r;
6054         int error;
6055
6056         dlm_lock_recovery(ls);
6057
6058         error = find_lkb(ls, lkid, &lkb);
6059         if (error)
6060                 goto out;
6061
6062         ua = lkb->lkb_ua;
6063
6064         error = set_unlock_args(flags, ua, &args);
6065         if (error)
6066                 goto out_put;
6067
6068         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6069
6070         r = lkb->lkb_resource;
6071         hold_rsb(r);
6072         lock_rsb(r);
6073
6074         error = validate_unlock_args(lkb, &args);
6075         if (error)
6076                 goto out_r;
6077         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
6078
6079         error = _cancel_lock(r, lkb);
6080  out_r:
6081         unlock_rsb(r);
6082         put_rsb(r);
6083
6084         if (error == -DLM_ECANCEL)
6085                 error = 0;
6086         /* from validate_unlock_args() */
6087         if (error == -EBUSY)
6088                 error = 0;
6089  out_put:
6090         dlm_put_lkb(lkb);
6091  out:
6092         dlm_unlock_recovery(ls);
6093         return error;
6094 }
6095
6096 /* lkb's that are removed from the waiters list by revert are just left on the
6097    orphans list with the granted orphan locks, to be freed by purge */
6098
6099 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6100 {
6101         struct dlm_args args;
6102         int error;
6103
6104         hold_lkb(lkb); /* reference for the ls_orphans list */
6105         mutex_lock(&ls->ls_orphans_mutex);
6106         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6107         mutex_unlock(&ls->ls_orphans_mutex);
6108
6109         set_unlock_args(0, lkb->lkb_ua, &args);
6110
6111         error = cancel_lock(ls, lkb, &args);
6112         if (error == -DLM_ECANCEL)
6113                 error = 0;
6114         return error;
6115 }
6116
6117 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6118    granted.  Regardless of what rsb queue the lock is on, it's removed and
6119    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6120    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6121
6122 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6123 {
6124         struct dlm_args args;
6125         int error;
6126
6127         set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6128                         lkb->lkb_ua, &args);
6129
6130         error = unlock_lock(ls, lkb, &args);
6131         if (error == -DLM_EUNLOCK)
6132                 error = 0;
6133         return error;
6134 }
6135
6136 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6137    (which does lock_rsb) due to deadlock with receiving a message that does
6138    lock_rsb followed by dlm_user_add_cb() */
6139
6140 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6141                                      struct dlm_user_proc *proc)
6142 {
6143         struct dlm_lkb *lkb = NULL;
6144
6145         mutex_lock(&ls->ls_clear_proc_locks);
6146         if (list_empty(&proc->locks))
6147                 goto out;
6148
6149         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6150         list_del_init(&lkb->lkb_ownqueue);
6151
6152         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6153                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
6154         else
6155                 lkb->lkb_flags |= DLM_IFL_DEAD;
6156  out:
6157         mutex_unlock(&ls->ls_clear_proc_locks);
6158         return lkb;
6159 }
6160
6161 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6162    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6163    which we clear here. */
6164
6165 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6166    list, and no more device_writes should add lkb's to proc->locks list; so we
6167    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6168    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6169    them ourself. */
6170
6171 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6172 {
6173         struct dlm_lkb *lkb, *safe;
6174
6175         dlm_lock_recovery(ls);
6176
6177         while (1) {
6178                 lkb = del_proc_lock(ls, proc);
6179                 if (!lkb)
6180                         break;
6181                 del_timeout(lkb);
6182                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6183                         orphan_proc_lock(ls, lkb);
6184                 else
6185                         unlock_proc_lock(ls, lkb);
6186
6187                 /* this removes the reference for the proc->locks list
6188                    added by dlm_user_request, it may result in the lkb
6189                    being freed */
6190
6191                 dlm_put_lkb(lkb);
6192         }
6193
6194         mutex_lock(&ls->ls_clear_proc_locks);
6195
6196         /* in-progress unlocks */
6197         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6198                 list_del_init(&lkb->lkb_ownqueue);
6199                 lkb->lkb_flags |= DLM_IFL_DEAD;
6200                 dlm_put_lkb(lkb);
6201         }
6202
6203         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6204                 memset(&lkb->lkb_callbacks, 0,
6205                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6206                 list_del_init(&lkb->lkb_cb_list);
6207                 dlm_put_lkb(lkb);
6208         }
6209
6210         mutex_unlock(&ls->ls_clear_proc_locks);
6211         dlm_unlock_recovery(ls);
6212 }
6213
6214 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6215 {
6216         struct dlm_lkb *lkb, *safe;
6217
6218         while (1) {
6219                 lkb = NULL;
6220                 spin_lock(&proc->locks_spin);
6221                 if (!list_empty(&proc->locks)) {
6222                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
6223                                          lkb_ownqueue);
6224                         list_del_init(&lkb->lkb_ownqueue);
6225                 }
6226                 spin_unlock(&proc->locks_spin);
6227
6228                 if (!lkb)
6229                         break;
6230
6231                 lkb->lkb_flags |= DLM_IFL_DEAD;
6232                 unlock_proc_lock(ls, lkb);
6233                 dlm_put_lkb(lkb); /* ref from proc->locks list */
6234         }
6235
6236         spin_lock(&proc->locks_spin);
6237         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6238                 list_del_init(&lkb->lkb_ownqueue);
6239                 lkb->lkb_flags |= DLM_IFL_DEAD;
6240                 dlm_put_lkb(lkb);
6241         }
6242         spin_unlock(&proc->locks_spin);
6243
6244         spin_lock(&proc->asts_spin);
6245         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6246                 memset(&lkb->lkb_callbacks, 0,
6247                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6248                 list_del_init(&lkb->lkb_cb_list);
6249                 dlm_put_lkb(lkb);
6250         }
6251         spin_unlock(&proc->asts_spin);
6252 }
6253
6254 /* pid of 0 means purge all orphans */
6255
6256 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6257 {
6258         struct dlm_lkb *lkb, *safe;
6259
6260         mutex_lock(&ls->ls_orphans_mutex);
6261         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6262                 if (pid && lkb->lkb_ownpid != pid)
6263                         continue;
6264                 unlock_proc_lock(ls, lkb);
6265                 list_del_init(&lkb->lkb_ownqueue);
6266                 dlm_put_lkb(lkb);
6267         }
6268         mutex_unlock(&ls->ls_orphans_mutex);
6269 }
6270
6271 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6272 {
6273         struct dlm_message *ms;
6274         struct dlm_mhandle *mh;
6275         int error;
6276
6277         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6278                                 DLM_MSG_PURGE, &ms, &mh);
6279         if (error)
6280                 return error;
6281         ms->m_nodeid = nodeid;
6282         ms->m_pid = pid;
6283
6284         return send_message(mh, ms);
6285 }
6286
6287 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6288                    int nodeid, int pid)
6289 {
6290         int error = 0;
6291
6292         if (nodeid && (nodeid != dlm_our_nodeid())) {
6293                 error = send_purge(ls, nodeid, pid);
6294         } else {
6295                 dlm_lock_recovery(ls);
6296                 if (pid == current->pid)
6297                         purge_proc_locks(ls, proc);
6298                 else
6299                         do_purge(ls, nodeid, pid);
6300                 dlm_unlock_recovery(ls);
6301         }
6302         return error;
6303 }
6304