Staging: lustre: obdclass: cl_lock: Declare as static
[firefly-linux-kernel-4.4.55.git] / drivers / staging / lustre / lustre / obdclass / cl_lock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Client Extent Lock.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_CLASS
42
43 #include "../include/obd_class.h"
44 #include "../include/obd_support.h"
45 #include "../include/lustre_fid.h"
46 #include <linux/list.h>
47 #include "../include/cl_object.h"
48 #include "cl_internal.h"
49
50 /** Lock class of cl_lock::cll_guard */
51 static struct lock_class_key cl_lock_guard_class;
52 static struct kmem_cache *cl_lock_kmem;
53
54 static struct lu_kmem_descr cl_lock_caches[] = {
55         {
56                 .ckd_cache = &cl_lock_kmem,
57                 .ckd_name  = "cl_lock_kmem",
58                 .ckd_size  = sizeof (struct cl_lock)
59         },
60         {
61                 .ckd_cache = NULL
62         }
63 };
64
65 #define CS_LOCK_INC(o, item)
66 #define CS_LOCK_DEC(o, item)
67 #define CS_LOCKSTATE_INC(o, state)
68 #define CS_LOCKSTATE_DEC(o, state)
69
70 /**
71  * Basic lock invariant that is maintained at all times. Caller either has a
72  * reference to \a lock, or somehow assures that \a lock cannot be freed.
73  *
74  * \see cl_lock_invariant()
75  */
76 static int cl_lock_invariant_trusted(const struct lu_env *env,
77                                      const struct cl_lock *lock)
78 {
79         return  ergo(lock->cll_state == CLS_FREEING, lock->cll_holds == 0) &&
80                 atomic_read(&lock->cll_ref) >= lock->cll_holds &&
81                 lock->cll_holds >= lock->cll_users &&
82                 lock->cll_holds >= 0 &&
83                 lock->cll_users >= 0 &&
84                 lock->cll_depth >= 0;
85 }
86
87 /**
88  * Stronger lock invariant, checking that caller has a reference on a lock.
89  *
90  * \see cl_lock_invariant_trusted()
91  */
92 static int cl_lock_invariant(const struct lu_env *env,
93                              const struct cl_lock *lock)
94 {
95         int result;
96
97         result = atomic_read(&lock->cll_ref) > 0 &&
98                 cl_lock_invariant_trusted(env, lock);
99         if (!result && env != NULL)
100                 CL_LOCK_DEBUG(D_ERROR, env, lock, "invariant broken");
101         return result;
102 }
103
104 /**
105  * Returns lock "nesting": 0 for a top-lock and 1 for a sub-lock.
106  */
107 static enum clt_nesting_level cl_lock_nesting(const struct cl_lock *lock)
108 {
109         return cl_object_header(lock->cll_descr.cld_obj)->coh_nesting;
110 }
111
112 /**
113  * Returns a set of counters for this lock, depending on a lock nesting.
114  */
115 static struct cl_thread_counters *cl_lock_counters(const struct lu_env *env,
116                                                    const struct cl_lock *lock)
117 {
118         struct cl_thread_info *info;
119         enum clt_nesting_level nesting;
120
121         info = cl_env_info(env);
122         nesting = cl_lock_nesting(lock);
123         LASSERT(nesting < ARRAY_SIZE(info->clt_counters));
124         return &info->clt_counters[nesting];
125 }
126
127 static void cl_lock_trace0(int level, const struct lu_env *env,
128                            const char *prefix, const struct cl_lock *lock,
129                            const char *func, const int line)
130 {
131         struct cl_object_header *h = cl_object_header(lock->cll_descr.cld_obj);
132         CDEBUG(level, "%s: %p@(%d %p %d %d %d %d %d %lx)(%p/%d/%d) at %s():%d\n",
133                prefix, lock, atomic_read(&lock->cll_ref),
134                lock->cll_guarder, lock->cll_depth,
135                lock->cll_state, lock->cll_error, lock->cll_holds,
136                lock->cll_users, lock->cll_flags,
137                env, h->coh_nesting, cl_lock_nr_mutexed(env),
138                func, line);
139 }
140 #define cl_lock_trace(level, env, prefix, lock)                  \
141         cl_lock_trace0(level, env, prefix, lock, __func__, __LINE__)
142
143 #define RETIP ((unsigned long)__builtin_return_address(0))
144
145 #ifdef CONFIG_LOCKDEP
146 static struct lock_class_key cl_lock_key;
147
148 static void cl_lock_lockdep_init(struct cl_lock *lock)
149 {
150         lockdep_set_class_and_name(lock, &cl_lock_key, "EXT");
151 }
152
153 static void cl_lock_lockdep_acquire(const struct lu_env *env,
154                                     struct cl_lock *lock, __u32 enqflags)
155 {
156         cl_lock_counters(env, lock)->ctc_nr_locks_acquired++;
157         lock_map_acquire(&lock->dep_map);
158 }
159
160 static void cl_lock_lockdep_release(const struct lu_env *env,
161                                     struct cl_lock *lock)
162 {
163         cl_lock_counters(env, lock)->ctc_nr_locks_acquired--;
164         lock_release(&lock->dep_map, 0, RETIP);
165 }
166
167 #else /* !CONFIG_LOCKDEP */
168
169 static void cl_lock_lockdep_init(struct cl_lock *lock)
170 {}
171 static void cl_lock_lockdep_acquire(const struct lu_env *env,
172                                     struct cl_lock *lock, __u32 enqflags)
173 {}
174 static void cl_lock_lockdep_release(const struct lu_env *env,
175                                     struct cl_lock *lock)
176 {}
177
178 #endif /* !CONFIG_LOCKDEP */
179
180 /**
181  * Adds lock slice to the compound lock.
182  *
183  * This is called by cl_object_operations::coo_lock_init() methods to add a
184  * per-layer state to the lock. New state is added at the end of
185  * cl_lock::cll_layers list, that is, it is at the bottom of the stack.
186  *
187  * \see cl_req_slice_add(), cl_page_slice_add(), cl_io_slice_add()
188  */
189 void cl_lock_slice_add(struct cl_lock *lock, struct cl_lock_slice *slice,
190                        struct cl_object *obj,
191                        const struct cl_lock_operations *ops)
192 {
193         slice->cls_lock = lock;
194         list_add_tail(&slice->cls_linkage, &lock->cll_layers);
195         slice->cls_obj = obj;
196         slice->cls_ops = ops;
197 }
198 EXPORT_SYMBOL(cl_lock_slice_add);
199
200 /**
201  * Returns true iff a lock with the mode \a has provides at least the same
202  * guarantees as a lock with the mode \a need.
203  */
204 int cl_lock_mode_match(enum cl_lock_mode has, enum cl_lock_mode need)
205 {
206         LINVRNT(need == CLM_READ || need == CLM_WRITE ||
207                 need == CLM_PHANTOM || need == CLM_GROUP);
208         LINVRNT(has == CLM_READ || has == CLM_WRITE ||
209                 has == CLM_PHANTOM || has == CLM_GROUP);
210         CLASSERT(CLM_PHANTOM < CLM_READ);
211         CLASSERT(CLM_READ < CLM_WRITE);
212         CLASSERT(CLM_WRITE < CLM_GROUP);
213
214         if (has != CLM_GROUP)
215                 return need <= has;
216         else
217                 return need == has;
218 }
219 EXPORT_SYMBOL(cl_lock_mode_match);
220
221 /**
222  * Returns true iff extent portions of lock descriptions match.
223  */
224 int cl_lock_ext_match(const struct cl_lock_descr *has,
225                       const struct cl_lock_descr *need)
226 {
227         return
228                 has->cld_start <= need->cld_start &&
229                 has->cld_end >= need->cld_end &&
230                 cl_lock_mode_match(has->cld_mode, need->cld_mode) &&
231                 (has->cld_mode != CLM_GROUP || has->cld_gid == need->cld_gid);
232 }
233 EXPORT_SYMBOL(cl_lock_ext_match);
234
235 /**
236  * Returns true iff a lock with the description \a has provides at least the
237  * same guarantees as a lock with the description \a need.
238  */
239 int cl_lock_descr_match(const struct cl_lock_descr *has,
240                         const struct cl_lock_descr *need)
241 {
242         return
243                 cl_object_same(has->cld_obj, need->cld_obj) &&
244                 cl_lock_ext_match(has, need);
245 }
246 EXPORT_SYMBOL(cl_lock_descr_match);
247
248 static void cl_lock_free(const struct lu_env *env, struct cl_lock *lock)
249 {
250         struct cl_object *obj = lock->cll_descr.cld_obj;
251
252         LINVRNT(!cl_lock_is_mutexed(lock));
253
254         cl_lock_trace(D_DLMTRACE, env, "free lock", lock);
255         might_sleep();
256         while (!list_empty(&lock->cll_layers)) {
257                 struct cl_lock_slice *slice;
258
259                 slice = list_entry(lock->cll_layers.next,
260                                        struct cl_lock_slice, cls_linkage);
261                 list_del_init(lock->cll_layers.next);
262                 slice->cls_ops->clo_fini(env, slice);
263         }
264         CS_LOCK_DEC(obj, total);
265         CS_LOCKSTATE_DEC(obj, lock->cll_state);
266         lu_object_ref_del_at(&obj->co_lu, &lock->cll_obj_ref, "cl_lock", lock);
267         cl_object_put(env, obj);
268         lu_ref_fini(&lock->cll_reference);
269         lu_ref_fini(&lock->cll_holders);
270         mutex_destroy(&lock->cll_guard);
271         OBD_SLAB_FREE_PTR(lock, cl_lock_kmem);
272 }
273
274 /**
275  * Releases a reference on a lock.
276  *
277  * When last reference is released, lock is returned to the cache, unless it
278  * is in cl_lock_state::CLS_FREEING state, in which case it is destroyed
279  * immediately.
280  *
281  * \see cl_object_put(), cl_page_put()
282  */
283 void cl_lock_put(const struct lu_env *env, struct cl_lock *lock)
284 {
285         struct cl_object        *obj;
286
287         LINVRNT(cl_lock_invariant(env, lock));
288         obj = lock->cll_descr.cld_obj;
289         LINVRNT(obj != NULL);
290
291         CDEBUG(D_TRACE, "releasing reference: %d %p %lu\n",
292                atomic_read(&lock->cll_ref), lock, RETIP);
293
294         if (atomic_dec_and_test(&lock->cll_ref)) {
295                 if (lock->cll_state == CLS_FREEING) {
296                         LASSERT(list_empty(&lock->cll_linkage));
297                         cl_lock_free(env, lock);
298                 }
299                 CS_LOCK_DEC(obj, busy);
300         }
301 }
302 EXPORT_SYMBOL(cl_lock_put);
303
304 /**
305  * Acquires an additional reference to a lock.
306  *
307  * This can be called only by caller already possessing a reference to \a
308  * lock.
309  *
310  * \see cl_object_get(), cl_page_get()
311  */
312 void cl_lock_get(struct cl_lock *lock)
313 {
314         LINVRNT(cl_lock_invariant(NULL, lock));
315         CDEBUG(D_TRACE, "acquiring reference: %d %p %lu\n",
316                atomic_read(&lock->cll_ref), lock, RETIP);
317         atomic_inc(&lock->cll_ref);
318 }
319 EXPORT_SYMBOL(cl_lock_get);
320
321 /**
322  * Acquires a reference to a lock.
323  *
324  * This is much like cl_lock_get(), except that this function can be used to
325  * acquire initial reference to the cached lock. Caller has to deal with all
326  * possible races. Use with care!
327  *
328  * \see cl_page_get_trust()
329  */
330 void cl_lock_get_trust(struct cl_lock *lock)
331 {
332         CDEBUG(D_TRACE, "acquiring trusted reference: %d %p %lu\n",
333                atomic_read(&lock->cll_ref), lock, RETIP);
334         if (atomic_inc_return(&lock->cll_ref) == 1)
335                 CS_LOCK_INC(lock->cll_descr.cld_obj, busy);
336 }
337 EXPORT_SYMBOL(cl_lock_get_trust);
338
339 /**
340  * Helper function destroying the lock that wasn't completely initialized.
341  *
342  * Other threads can acquire references to the top-lock through its
343  * sub-locks. Hence, it cannot be cl_lock_free()-ed immediately.
344  */
345 static void cl_lock_finish(const struct lu_env *env, struct cl_lock *lock)
346 {
347         cl_lock_mutex_get(env, lock);
348         cl_lock_cancel(env, lock);
349         cl_lock_delete(env, lock);
350         cl_lock_mutex_put(env, lock);
351         cl_lock_put(env, lock);
352 }
353
354 static struct cl_lock *cl_lock_alloc(const struct lu_env *env,
355                                      struct cl_object *obj,
356                                      const struct cl_io *io,
357                                      const struct cl_lock_descr *descr)
358 {
359         struct cl_lock    *lock;
360         struct lu_object_header *head;
361
362         OBD_SLAB_ALLOC_PTR_GFP(lock, cl_lock_kmem, GFP_NOFS);
363         if (lock != NULL) {
364                 atomic_set(&lock->cll_ref, 1);
365                 lock->cll_descr = *descr;
366                 lock->cll_state = CLS_NEW;
367                 cl_object_get(obj);
368                 lu_object_ref_add_at(&obj->co_lu, &lock->cll_obj_ref, "cl_lock",
369                                      lock);
370                 INIT_LIST_HEAD(&lock->cll_layers);
371                 INIT_LIST_HEAD(&lock->cll_linkage);
372                 INIT_LIST_HEAD(&lock->cll_inclosure);
373                 lu_ref_init(&lock->cll_reference);
374                 lu_ref_init(&lock->cll_holders);
375                 mutex_init(&lock->cll_guard);
376                 lockdep_set_class(&lock->cll_guard, &cl_lock_guard_class);
377                 init_waitqueue_head(&lock->cll_wq);
378                 head = obj->co_lu.lo_header;
379                 CS_LOCKSTATE_INC(obj, CLS_NEW);
380                 CS_LOCK_INC(obj, total);
381                 CS_LOCK_INC(obj, create);
382                 cl_lock_lockdep_init(lock);
383                 list_for_each_entry(obj, &head->loh_layers,
384                                         co_lu.lo_linkage) {
385                         int err;
386
387                         err = obj->co_ops->coo_lock_init(env, obj, lock, io);
388                         if (err != 0) {
389                                 cl_lock_finish(env, lock);
390                                 lock = ERR_PTR(err);
391                                 break;
392                         }
393                 }
394         } else
395                 lock = ERR_PTR(-ENOMEM);
396         return lock;
397 }
398
399 /**
400  * Transfer the lock into INTRANSIT state and return the original state.
401  *
402  * \pre  state: CLS_CACHED, CLS_HELD or CLS_ENQUEUED
403  * \post state: CLS_INTRANSIT
404  * \see CLS_INTRANSIT
405  */
406 static enum cl_lock_state cl_lock_intransit(const struct lu_env *env,
407                                             struct cl_lock *lock)
408 {
409         enum cl_lock_state state = lock->cll_state;
410
411         LASSERT(cl_lock_is_mutexed(lock));
412         LASSERT(state != CLS_INTRANSIT);
413         LASSERTF(state >= CLS_ENQUEUED && state <= CLS_CACHED,
414                  "Malformed lock state %d.\n", state);
415
416         cl_lock_state_set(env, lock, CLS_INTRANSIT);
417         lock->cll_intransit_owner = current;
418         cl_lock_hold_add(env, lock, "intransit", current);
419         return state;
420 }
421
422 /**
423  *  Exit the intransit state and restore the lock state to the original state
424  */
425 static void cl_lock_extransit(const struct lu_env *env, struct cl_lock *lock,
426                               enum cl_lock_state state)
427 {
428         LASSERT(cl_lock_is_mutexed(lock));
429         LASSERT(lock->cll_state == CLS_INTRANSIT);
430         LASSERT(state != CLS_INTRANSIT);
431         LASSERT(lock->cll_intransit_owner == current);
432
433         lock->cll_intransit_owner = NULL;
434         cl_lock_state_set(env, lock, state);
435         cl_lock_unhold(env, lock, "intransit", current);
436 }
437
438 /**
439  * Checking whether the lock is intransit state
440  */
441 int cl_lock_is_intransit(struct cl_lock *lock)
442 {
443         LASSERT(cl_lock_is_mutexed(lock));
444         return lock->cll_state == CLS_INTRANSIT &&
445                lock->cll_intransit_owner != current;
446 }
447 EXPORT_SYMBOL(cl_lock_is_intransit);
448 /**
449  * Returns true iff lock is "suitable" for given io. E.g., locks acquired by
450  * truncate and O_APPEND cannot be reused for read/non-append-write, as they
451  * cover multiple stripes and can trigger cascading timeouts.
452  */
453 static int cl_lock_fits_into(const struct lu_env *env,
454                              const struct cl_lock *lock,
455                              const struct cl_lock_descr *need,
456                              const struct cl_io *io)
457 {
458         const struct cl_lock_slice *slice;
459
460         LINVRNT(cl_lock_invariant_trusted(env, lock));
461         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
462                 if (slice->cls_ops->clo_fits_into != NULL &&
463                     !slice->cls_ops->clo_fits_into(env, slice, need, io))
464                         return 0;
465         }
466         return 1;
467 }
468
469 static struct cl_lock *cl_lock_lookup(const struct lu_env *env,
470                                       struct cl_object *obj,
471                                       const struct cl_io *io,
472                                       const struct cl_lock_descr *need)
473 {
474         struct cl_lock    *lock;
475         struct cl_object_header *head;
476
477         head = cl_object_header(obj);
478         assert_spin_locked(&head->coh_lock_guard);
479         CS_LOCK_INC(obj, lookup);
480         list_for_each_entry(lock, &head->coh_locks, cll_linkage) {
481                 int matched;
482
483                 matched = cl_lock_ext_match(&lock->cll_descr, need) &&
484                           lock->cll_state < CLS_FREEING &&
485                           lock->cll_error == 0 &&
486                           !(lock->cll_flags & CLF_CANCELLED) &&
487                           cl_lock_fits_into(env, lock, need, io);
488                 CDEBUG(D_DLMTRACE, "has: "DDESCR"(%d) need: "DDESCR": %d\n",
489                        PDESCR(&lock->cll_descr), lock->cll_state, PDESCR(need),
490                        matched);
491                 if (matched) {
492                         cl_lock_get_trust(lock);
493                         CS_LOCK_INC(obj, hit);
494                         return lock;
495                 }
496         }
497         return NULL;
498 }
499
500 /**
501  * Returns a lock matching description \a need.
502  *
503  * This is the main entry point into the cl_lock caching interface. First, a
504  * cache (implemented as a per-object linked list) is consulted. If lock is
505  * found there, it is returned immediately. Otherwise new lock is allocated
506  * and returned. In any case, additional reference to lock is acquired.
507  *
508  * \see cl_object_find(), cl_page_find()
509  */
510 static struct cl_lock *cl_lock_find(const struct lu_env *env,
511                                     const struct cl_io *io,
512                                     const struct cl_lock_descr *need)
513 {
514         struct cl_object_header *head;
515         struct cl_object        *obj;
516         struct cl_lock    *lock;
517
518         obj  = need->cld_obj;
519         head = cl_object_header(obj);
520
521         spin_lock(&head->coh_lock_guard);
522         lock = cl_lock_lookup(env, obj, io, need);
523         spin_unlock(&head->coh_lock_guard);
524
525         if (lock == NULL) {
526                 lock = cl_lock_alloc(env, obj, io, need);
527                 if (!IS_ERR(lock)) {
528                         struct cl_lock *ghost;
529
530                         spin_lock(&head->coh_lock_guard);
531                         ghost = cl_lock_lookup(env, obj, io, need);
532                         if (ghost == NULL) {
533                                 cl_lock_get_trust(lock);
534                                 list_add_tail(&lock->cll_linkage,
535                                                   &head->coh_locks);
536                                 spin_unlock(&head->coh_lock_guard);
537                                 CS_LOCK_INC(obj, busy);
538                         } else {
539                                 spin_unlock(&head->coh_lock_guard);
540                                 /*
541                                  * Other threads can acquire references to the
542                                  * top-lock through its sub-locks. Hence, it
543                                  * cannot be cl_lock_free()-ed immediately.
544                                  */
545                                 cl_lock_finish(env, lock);
546                                 lock = ghost;
547                         }
548                 }
549         }
550         return lock;
551 }
552
553 /**
554  * Returns existing lock matching given description. This is similar to
555  * cl_lock_find() except that no new lock is created, and returned lock is
556  * guaranteed to be in enum cl_lock_state::CLS_HELD state.
557  */
558 struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io,
559                              const struct cl_lock_descr *need,
560                              const char *scope, const void *source)
561 {
562         struct cl_object_header *head;
563         struct cl_object        *obj;
564         struct cl_lock    *lock;
565
566         obj  = need->cld_obj;
567         head = cl_object_header(obj);
568
569         do {
570                 spin_lock(&head->coh_lock_guard);
571                 lock = cl_lock_lookup(env, obj, io, need);
572                 spin_unlock(&head->coh_lock_guard);
573                 if (lock == NULL)
574                         return NULL;
575
576                 cl_lock_mutex_get(env, lock);
577                 if (lock->cll_state == CLS_INTRANSIT)
578                         /* Don't care return value. */
579                         cl_lock_state_wait(env, lock);
580                 if (lock->cll_state == CLS_FREEING) {
581                         cl_lock_mutex_put(env, lock);
582                         cl_lock_put(env, lock);
583                         lock = NULL;
584                 }
585         } while (lock == NULL);
586
587         cl_lock_hold_add(env, lock, scope, source);
588         cl_lock_user_add(env, lock);
589         if (lock->cll_state == CLS_CACHED)
590                 cl_use_try(env, lock, 1);
591         if (lock->cll_state == CLS_HELD) {
592                 cl_lock_mutex_put(env, lock);
593                 cl_lock_lockdep_acquire(env, lock, 0);
594                 cl_lock_put(env, lock);
595         } else {
596                 cl_unuse_try(env, lock);
597                 cl_lock_unhold(env, lock, scope, source);
598                 cl_lock_mutex_put(env, lock);
599                 cl_lock_put(env, lock);
600                 lock = NULL;
601         }
602
603         return lock;
604 }
605 EXPORT_SYMBOL(cl_lock_peek);
606
607 /**
608  * Returns a slice within a lock, corresponding to the given layer in the
609  * device stack.
610  *
611  * \see cl_page_at()
612  */
613 const struct cl_lock_slice *cl_lock_at(const struct cl_lock *lock,
614                                        const struct lu_device_type *dtype)
615 {
616         const struct cl_lock_slice *slice;
617
618         LINVRNT(cl_lock_invariant_trusted(NULL, lock));
619
620         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
621                 if (slice->cls_obj->co_lu.lo_dev->ld_type == dtype)
622                         return slice;
623         }
624         return NULL;
625 }
626 EXPORT_SYMBOL(cl_lock_at);
627
628 static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock)
629 {
630         struct cl_thread_counters *counters;
631
632         counters = cl_lock_counters(env, lock);
633         lock->cll_depth++;
634         counters->ctc_nr_locks_locked++;
635         lu_ref_add(&counters->ctc_locks_locked, "cll_guard", lock);
636         cl_lock_trace(D_TRACE, env, "got mutex", lock);
637 }
638
639 /**
640  * Locks cl_lock object.
641  *
642  * This is used to manipulate cl_lock fields, and to serialize state
643  * transitions in the lock state machine.
644  *
645  * \post cl_lock_is_mutexed(lock)
646  *
647  * \see cl_lock_mutex_put()
648  */
649 void cl_lock_mutex_get(const struct lu_env *env, struct cl_lock *lock)
650 {
651         LINVRNT(cl_lock_invariant(env, lock));
652
653         if (lock->cll_guarder == current) {
654                 LINVRNT(cl_lock_is_mutexed(lock));
655                 LINVRNT(lock->cll_depth > 0);
656         } else {
657                 struct cl_object_header *hdr;
658                 struct cl_thread_info   *info;
659                 int i;
660
661                 LINVRNT(lock->cll_guarder != current);
662                 hdr = cl_object_header(lock->cll_descr.cld_obj);
663                 /*
664                  * Check that mutices are taken in the bottom-to-top order.
665                  */
666                 info = cl_env_info(env);
667                 for (i = 0; i < hdr->coh_nesting; ++i)
668                         LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
669                 mutex_lock_nested(&lock->cll_guard, hdr->coh_nesting);
670                 lock->cll_guarder = current;
671                 LINVRNT(lock->cll_depth == 0);
672         }
673         cl_lock_mutex_tail(env, lock);
674 }
675 EXPORT_SYMBOL(cl_lock_mutex_get);
676
677 /**
678  * Try-locks cl_lock object.
679  *
680  * \retval 0 \a lock was successfully locked
681  *
682  * \retval -EBUSY \a lock cannot be locked right now
683  *
684  * \post ergo(result == 0, cl_lock_is_mutexed(lock))
685  *
686  * \see cl_lock_mutex_get()
687  */
688 int cl_lock_mutex_try(const struct lu_env *env, struct cl_lock *lock)
689 {
690         int result;
691
692         LINVRNT(cl_lock_invariant_trusted(env, lock));
693
694         result = 0;
695         if (lock->cll_guarder == current) {
696                 LINVRNT(lock->cll_depth > 0);
697                 cl_lock_mutex_tail(env, lock);
698         } else if (mutex_trylock(&lock->cll_guard)) {
699                 LINVRNT(lock->cll_depth == 0);
700                 lock->cll_guarder = current;
701                 cl_lock_mutex_tail(env, lock);
702         } else
703                 result = -EBUSY;
704         return result;
705 }
706 EXPORT_SYMBOL(cl_lock_mutex_try);
707
708 /**
709  {* Unlocks cl_lock object.
710  *
711  * \pre cl_lock_is_mutexed(lock)
712  *
713  * \see cl_lock_mutex_get()
714  */
715 void cl_lock_mutex_put(const struct lu_env *env, struct cl_lock *lock)
716 {
717         struct cl_thread_counters *counters;
718
719         LINVRNT(cl_lock_invariant(env, lock));
720         LINVRNT(cl_lock_is_mutexed(lock));
721         LINVRNT(lock->cll_guarder == current);
722         LINVRNT(lock->cll_depth > 0);
723
724         counters = cl_lock_counters(env, lock);
725         LINVRNT(counters->ctc_nr_locks_locked > 0);
726
727         cl_lock_trace(D_TRACE, env, "put mutex", lock);
728         lu_ref_del(&counters->ctc_locks_locked, "cll_guard", lock);
729         counters->ctc_nr_locks_locked--;
730         if (--lock->cll_depth == 0) {
731                 lock->cll_guarder = NULL;
732                 mutex_unlock(&lock->cll_guard);
733         }
734 }
735 EXPORT_SYMBOL(cl_lock_mutex_put);
736
737 /**
738  * Returns true iff lock's mutex is owned by the current thread.
739  */
740 int cl_lock_is_mutexed(struct cl_lock *lock)
741 {
742         return lock->cll_guarder == current;
743 }
744 EXPORT_SYMBOL(cl_lock_is_mutexed);
745
746 /**
747  * Returns number of cl_lock mutices held by the current thread (environment).
748  */
749 int cl_lock_nr_mutexed(const struct lu_env *env)
750 {
751         struct cl_thread_info *info;
752         int i;
753         int locked;
754
755         /*
756          * NOTE: if summation across all nesting levels (currently 2) proves
757          *       too expensive, a summary counter can be added to
758          *       struct cl_thread_info.
759          */
760         info = cl_env_info(env);
761         for (i = 0, locked = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
762                 locked += info->clt_counters[i].ctc_nr_locks_locked;
763         return locked;
764 }
765 EXPORT_SYMBOL(cl_lock_nr_mutexed);
766
767 static void cl_lock_cancel0(const struct lu_env *env, struct cl_lock *lock)
768 {
769         LINVRNT(cl_lock_is_mutexed(lock));
770         LINVRNT(cl_lock_invariant(env, lock));
771         if (!(lock->cll_flags & CLF_CANCELLED)) {
772                 const struct cl_lock_slice *slice;
773
774                 lock->cll_flags |= CLF_CANCELLED;
775                 list_for_each_entry_reverse(slice, &lock->cll_layers,
776                                                 cls_linkage) {
777                         if (slice->cls_ops->clo_cancel != NULL)
778                                 slice->cls_ops->clo_cancel(env, slice);
779                 }
780         }
781 }
782
783 static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
784 {
785         struct cl_object_header    *head;
786         const struct cl_lock_slice *slice;
787
788         LINVRNT(cl_lock_is_mutexed(lock));
789         LINVRNT(cl_lock_invariant(env, lock));
790
791         if (lock->cll_state < CLS_FREEING) {
792                 bool in_cache;
793
794                 LASSERT(lock->cll_state != CLS_INTRANSIT);
795                 cl_lock_state_set(env, lock, CLS_FREEING);
796
797                 head = cl_object_header(lock->cll_descr.cld_obj);
798
799                 spin_lock(&head->coh_lock_guard);
800                 in_cache = !list_empty(&lock->cll_linkage);
801                 if (in_cache)
802                         list_del_init(&lock->cll_linkage);
803                 spin_unlock(&head->coh_lock_guard);
804
805                 if (in_cache) /* coh_locks cache holds a refcount. */
806                         cl_lock_put(env, lock);
807
808                 /*
809                  * From now on, no new references to this lock can be acquired
810                  * by cl_lock_lookup().
811                  */
812                 list_for_each_entry_reverse(slice, &lock->cll_layers,
813                                                 cls_linkage) {
814                         if (slice->cls_ops->clo_delete != NULL)
815                                 slice->cls_ops->clo_delete(env, slice);
816                 }
817                 /*
818                  * From now on, no new references to this lock can be acquired
819                  * by layer-specific means (like a pointer from struct
820                  * ldlm_lock in osc, or a pointer from top-lock to sub-lock in
821                  * lov).
822                  *
823                  * Lock will be finally freed in cl_lock_put() when last of
824                  * existing references goes away.
825                  */
826         }
827 }
828
829 /**
830  * Mod(ifie)s cl_lock::cll_holds counter for a given lock. Also, for a
831  * top-lock (nesting == 0) accounts for this modification in the per-thread
832  * debugging counters. Sub-lock holds can be released by a thread different
833  * from one that acquired it.
834  */
835 static void cl_lock_hold_mod(const struct lu_env *env, struct cl_lock *lock,
836                              int delta)
837 {
838         struct cl_thread_counters *counters;
839         enum clt_nesting_level     nesting;
840
841         lock->cll_holds += delta;
842         nesting = cl_lock_nesting(lock);
843         if (nesting == CNL_TOP) {
844                 counters = &cl_env_info(env)->clt_counters[CNL_TOP];
845                 counters->ctc_nr_held += delta;
846                 LASSERT(counters->ctc_nr_held >= 0);
847         }
848 }
849
850 /**
851  * Mod(ifie)s cl_lock::cll_users counter for a given lock. See
852  * cl_lock_hold_mod() for the explanation of the debugging code.
853  */
854 static void cl_lock_used_mod(const struct lu_env *env, struct cl_lock *lock,
855                              int delta)
856 {
857         struct cl_thread_counters *counters;
858         enum clt_nesting_level     nesting;
859
860         lock->cll_users += delta;
861         nesting = cl_lock_nesting(lock);
862         if (nesting == CNL_TOP) {
863                 counters = &cl_env_info(env)->clt_counters[CNL_TOP];
864                 counters->ctc_nr_used += delta;
865                 LASSERT(counters->ctc_nr_used >= 0);
866         }
867 }
868
869 void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
870                           const char *scope, const void *source)
871 {
872         LINVRNT(cl_lock_is_mutexed(lock));
873         LINVRNT(cl_lock_invariant(env, lock));
874         LASSERT(lock->cll_holds > 0);
875
876         cl_lock_trace(D_DLMTRACE, env, "hold release lock", lock);
877         lu_ref_del(&lock->cll_holders, scope, source);
878         cl_lock_hold_mod(env, lock, -1);
879         if (lock->cll_holds == 0) {
880                 CL_LOCK_ASSERT(lock->cll_state != CLS_HELD, env, lock);
881                 if (lock->cll_descr.cld_mode == CLM_PHANTOM ||
882                     lock->cll_descr.cld_mode == CLM_GROUP ||
883                     lock->cll_state != CLS_CACHED)
884                         /*
885                          * If lock is still phantom or grouplock when user is
886                          * done with it---destroy the lock.
887                          */
888                         lock->cll_flags |= CLF_CANCELPEND|CLF_DOOMED;
889                 if (lock->cll_flags & CLF_CANCELPEND) {
890                         lock->cll_flags &= ~CLF_CANCELPEND;
891                         cl_lock_cancel0(env, lock);
892                 }
893                 if (lock->cll_flags & CLF_DOOMED) {
894                         /* no longer doomed: it's dead... Jim. */
895                         lock->cll_flags &= ~CLF_DOOMED;
896                         cl_lock_delete0(env, lock);
897                 }
898         }
899 }
900 EXPORT_SYMBOL(cl_lock_hold_release);
901
902 /**
903  * Waits until lock state is changed.
904  *
905  * This function is called with cl_lock mutex locked, atomically releases
906  * mutex and goes to sleep, waiting for a lock state change (signaled by
907  * cl_lock_signal()), and re-acquires the mutex before return.
908  *
909  * This function is used to wait until lock state machine makes some progress
910  * and to emulate synchronous operations on top of asynchronous lock
911  * interface.
912  *
913  * \retval -EINTR wait was interrupted
914  *
915  * \retval 0 wait wasn't interrupted
916  *
917  * \pre cl_lock_is_mutexed(lock)
918  *
919  * \see cl_lock_signal()
920  */
921 int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock)
922 {
923         wait_queue_t waiter;
924         sigset_t blocked;
925         int result;
926
927         LINVRNT(cl_lock_is_mutexed(lock));
928         LINVRNT(cl_lock_invariant(env, lock));
929         LASSERT(lock->cll_depth == 1);
930         LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */
931
932         cl_lock_trace(D_DLMTRACE, env, "state wait lock", lock);
933         result = lock->cll_error;
934         if (result == 0) {
935                 /* To avoid being interrupted by the 'non-fatal' signals
936                  * (SIGCHLD, for instance), we'd block them temporarily.
937                  * LU-305 */
938                 blocked = cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
939
940                 init_waitqueue_entry(&waiter, current);
941                 add_wait_queue(&lock->cll_wq, &waiter);
942                 set_current_state(TASK_INTERRUPTIBLE);
943                 cl_lock_mutex_put(env, lock);
944
945                 LASSERT(cl_lock_nr_mutexed(env) == 0);
946
947                 /* Returning ERESTARTSYS instead of EINTR so syscalls
948                  * can be restarted if signals are pending here */
949                 result = -ERESTARTSYS;
950                 if (likely(!OBD_FAIL_CHECK(OBD_FAIL_LOCK_STATE_WAIT_INTR))) {
951                         schedule();
952                         if (!cfs_signal_pending())
953                                 result = 0;
954                 }
955
956                 cl_lock_mutex_get(env, lock);
957                 set_current_state(TASK_RUNNING);
958                 remove_wait_queue(&lock->cll_wq, &waiter);
959
960                 /* Restore old blocked signals */
961                 cfs_restore_sigs(blocked);
962         }
963         return result;
964 }
965 EXPORT_SYMBOL(cl_lock_state_wait);
966
967 static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock,
968                                  enum cl_lock_state state)
969 {
970         const struct cl_lock_slice *slice;
971
972         LINVRNT(cl_lock_is_mutexed(lock));
973         LINVRNT(cl_lock_invariant(env, lock));
974
975         list_for_each_entry(slice, &lock->cll_layers, cls_linkage)
976                 if (slice->cls_ops->clo_state != NULL)
977                         slice->cls_ops->clo_state(env, slice, state);
978         wake_up_all(&lock->cll_wq);
979 }
980
981 /**
982  * Notifies waiters that lock state changed.
983  *
984  * Wakes up all waiters sleeping in cl_lock_state_wait(), also notifies all
985  * layers about state change by calling cl_lock_operations::clo_state()
986  * top-to-bottom.
987  */
988 void cl_lock_signal(const struct lu_env *env, struct cl_lock *lock)
989 {
990         cl_lock_trace(D_DLMTRACE, env, "state signal lock", lock);
991         cl_lock_state_signal(env, lock, lock->cll_state);
992 }
993 EXPORT_SYMBOL(cl_lock_signal);
994
995 /**
996  * Changes lock state.
997  *
998  * This function is invoked to notify layers that lock state changed, possible
999  * as a result of an asynchronous event such as call-back reception.
1000  *
1001  * \post lock->cll_state == state
1002  *
1003  * \see cl_lock_operations::clo_state()
1004  */
1005 void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock,
1006                        enum cl_lock_state state)
1007 {
1008         LASSERT(lock->cll_state <= state ||
1009                 (lock->cll_state == CLS_CACHED &&
1010                  (state == CLS_HELD || /* lock found in cache */
1011                   state == CLS_NEW  ||   /* sub-lock canceled */
1012                   state == CLS_INTRANSIT)) ||
1013                 /* lock is in transit state */
1014                 lock->cll_state == CLS_INTRANSIT);
1015
1016         if (lock->cll_state != state) {
1017                 CS_LOCKSTATE_DEC(lock->cll_descr.cld_obj, lock->cll_state);
1018                 CS_LOCKSTATE_INC(lock->cll_descr.cld_obj, state);
1019
1020                 cl_lock_state_signal(env, lock, state);
1021                 lock->cll_state = state;
1022         }
1023 }
1024 EXPORT_SYMBOL(cl_lock_state_set);
1025
1026 static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock)
1027 {
1028         const struct cl_lock_slice *slice;
1029         int result;
1030
1031         do {
1032                 result = 0;
1033
1034                 LINVRNT(cl_lock_is_mutexed(lock));
1035                 LINVRNT(cl_lock_invariant(env, lock));
1036                 LASSERT(lock->cll_state == CLS_INTRANSIT);
1037
1038                 result = -ENOSYS;
1039                 list_for_each_entry_reverse(slice, &lock->cll_layers,
1040                                                 cls_linkage) {
1041                         if (slice->cls_ops->clo_unuse != NULL) {
1042                                 result = slice->cls_ops->clo_unuse(env, slice);
1043                                 if (result != 0)
1044                                         break;
1045                         }
1046                 }
1047                 LASSERT(result != -ENOSYS);
1048         } while (result == CLO_REPEAT);
1049
1050         return result;
1051 }
1052
1053 /**
1054  * Yanks lock from the cache (cl_lock_state::CLS_CACHED state) by calling
1055  * cl_lock_operations::clo_use() top-to-bottom to notify layers.
1056  * @atomic = 1, it must unuse the lock to recovery the lock to keep the
1057  *  use process atomic
1058  */
1059 int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic)
1060 {
1061         const struct cl_lock_slice *slice;
1062         int result;
1063         enum cl_lock_state state;
1064
1065         cl_lock_trace(D_DLMTRACE, env, "use lock", lock);
1066
1067         LASSERT(lock->cll_state == CLS_CACHED);
1068         if (lock->cll_error)
1069                 return lock->cll_error;
1070
1071         result = -ENOSYS;
1072         state = cl_lock_intransit(env, lock);
1073         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1074                 if (slice->cls_ops->clo_use != NULL) {
1075                         result = slice->cls_ops->clo_use(env, slice);
1076                         if (result != 0)
1077                                 break;
1078                 }
1079         }
1080         LASSERT(result != -ENOSYS);
1081
1082         LASSERTF(lock->cll_state == CLS_INTRANSIT, "Wrong state %d.\n",
1083                  lock->cll_state);
1084
1085         if (result == 0) {
1086                 state = CLS_HELD;
1087         } else {
1088                 if (result == -ESTALE) {
1089                         /*
1090                          * ESTALE means sublock being cancelled
1091                          * at this time, and set lock state to
1092                          * be NEW here and ask the caller to repeat.
1093                          */
1094                         state = CLS_NEW;
1095                         result = CLO_REPEAT;
1096                 }
1097
1098                 /* @atomic means back-off-on-failure. */
1099                 if (atomic) {
1100                         int rc;
1101                         rc = cl_unuse_try_internal(env, lock);
1102                         /* Vet the results. */
1103                         if (rc < 0 && result > 0)
1104                                 result = rc;
1105                 }
1106
1107         }
1108         cl_lock_extransit(env, lock, state);
1109         return result;
1110 }
1111 EXPORT_SYMBOL(cl_use_try);
1112
1113 /**
1114  * Helper for cl_enqueue_try() that calls ->clo_enqueue() across all layers
1115  * top-to-bottom.
1116  */
1117 static int cl_enqueue_kick(const struct lu_env *env,
1118                            struct cl_lock *lock,
1119                            struct cl_io *io, __u32 flags)
1120 {
1121         int result;
1122         const struct cl_lock_slice *slice;
1123
1124         result = -ENOSYS;
1125         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1126                 if (slice->cls_ops->clo_enqueue != NULL) {
1127                         result = slice->cls_ops->clo_enqueue(env,
1128                                                              slice, io, flags);
1129                         if (result != 0)
1130                                 break;
1131                 }
1132         }
1133         LASSERT(result != -ENOSYS);
1134         return result;
1135 }
1136
1137 /**
1138  * Tries to enqueue a lock.
1139  *
1140  * This function is called repeatedly by cl_enqueue() until either lock is
1141  * enqueued, or error occurs. This function does not block waiting for
1142  * networking communication to complete.
1143  *
1144  * \post ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1145  *                       lock->cll_state == CLS_HELD)
1146  *
1147  * \see cl_enqueue() cl_lock_operations::clo_enqueue()
1148  * \see cl_lock_state::CLS_ENQUEUED
1149  */
1150 int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
1151                    struct cl_io *io, __u32 flags)
1152 {
1153         int result;
1154
1155         cl_lock_trace(D_DLMTRACE, env, "enqueue lock", lock);
1156         do {
1157                 LINVRNT(cl_lock_is_mutexed(lock));
1158
1159                 result = lock->cll_error;
1160                 if (result != 0)
1161                         break;
1162
1163                 switch (lock->cll_state) {
1164                 case CLS_NEW:
1165                         cl_lock_state_set(env, lock, CLS_QUEUING);
1166                         /* fall-through */
1167                 case CLS_QUEUING:
1168                         /* kick layers. */
1169                         result = cl_enqueue_kick(env, lock, io, flags);
1170                         /* For AGL case, the cl_lock::cll_state may
1171                          * become CLS_HELD already. */
1172                         if (result == 0 && lock->cll_state == CLS_QUEUING)
1173                                 cl_lock_state_set(env, lock, CLS_ENQUEUED);
1174                         break;
1175                 case CLS_INTRANSIT:
1176                         LASSERT(cl_lock_is_intransit(lock));
1177                         result = CLO_WAIT;
1178                         break;
1179                 case CLS_CACHED:
1180                         /* yank lock from the cache. */
1181                         result = cl_use_try(env, lock, 0);
1182                         break;
1183                 case CLS_ENQUEUED:
1184                 case CLS_HELD:
1185                         result = 0;
1186                         break;
1187                 default:
1188                 case CLS_FREEING:
1189                         /*
1190                          * impossible, only held locks with increased
1191                          * ->cll_holds can be enqueued, and they cannot be
1192                          * freed.
1193                          */
1194                         LBUG();
1195                 }
1196         } while (result == CLO_REPEAT);
1197         return result;
1198 }
1199 EXPORT_SYMBOL(cl_enqueue_try);
1200
1201 /**
1202  * Cancel the conflicting lock found during previous enqueue.
1203  *
1204  * \retval 0 conflicting lock has been canceled.
1205  * \retval -ve error code.
1206  */
1207 int cl_lock_enqueue_wait(const struct lu_env *env,
1208                          struct cl_lock *lock,
1209                          int keep_mutex)
1210 {
1211         struct cl_lock  *conflict;
1212         int           rc = 0;
1213
1214         LASSERT(cl_lock_is_mutexed(lock));
1215         LASSERT(lock->cll_state == CLS_QUEUING);
1216         LASSERT(lock->cll_conflict != NULL);
1217
1218         conflict = lock->cll_conflict;
1219         lock->cll_conflict = NULL;
1220
1221         cl_lock_mutex_put(env, lock);
1222         LASSERT(cl_lock_nr_mutexed(env) == 0);
1223
1224         cl_lock_mutex_get(env, conflict);
1225         cl_lock_trace(D_DLMTRACE, env, "enqueue wait", conflict);
1226         cl_lock_cancel(env, conflict);
1227         cl_lock_delete(env, conflict);
1228
1229         while (conflict->cll_state != CLS_FREEING) {
1230                 rc = cl_lock_state_wait(env, conflict);
1231                 if (rc != 0)
1232                         break;
1233         }
1234         cl_lock_mutex_put(env, conflict);
1235         lu_ref_del(&conflict->cll_reference, "cancel-wait", lock);
1236         cl_lock_put(env, conflict);
1237
1238         if (keep_mutex)
1239                 cl_lock_mutex_get(env, lock);
1240
1241         LASSERT(rc <= 0);
1242         return rc;
1243 }
1244 EXPORT_SYMBOL(cl_lock_enqueue_wait);
1245
1246 static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock,
1247                              struct cl_io *io, __u32 enqflags)
1248 {
1249         int result;
1250
1251         LINVRNT(cl_lock_is_mutexed(lock));
1252         LINVRNT(cl_lock_invariant(env, lock));
1253         LASSERT(lock->cll_holds > 0);
1254
1255         cl_lock_user_add(env, lock);
1256         do {
1257                 result = cl_enqueue_try(env, lock, io, enqflags);
1258                 if (result == CLO_WAIT) {
1259                         if (lock->cll_conflict != NULL)
1260                                 result = cl_lock_enqueue_wait(env, lock, 1);
1261                         else
1262                                 result = cl_lock_state_wait(env, lock);
1263                         if (result == 0)
1264                                 continue;
1265                 }
1266                 break;
1267         } while (1);
1268         if (result != 0)
1269                 cl_unuse_try(env, lock);
1270         LASSERT(ergo(result == 0 && !(enqflags & CEF_AGL),
1271                      lock->cll_state == CLS_ENQUEUED ||
1272                      lock->cll_state == CLS_HELD));
1273         return result;
1274 }
1275
1276 /**
1277  * Tries to unlock a lock.
1278  *
1279  * This function is called to release underlying resource:
1280  * 1. for top lock, the resource is sublocks it held;
1281  * 2. for sublock, the resource is the reference to dlmlock.
1282  *
1283  * cl_unuse_try is a one-shot operation, so it must NOT return CLO_WAIT.
1284  *
1285  * \see cl_unuse() cl_lock_operations::clo_unuse()
1286  * \see cl_lock_state::CLS_CACHED
1287  */
1288 int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
1289 {
1290         int                      result;
1291         enum cl_lock_state        state = CLS_NEW;
1292
1293         cl_lock_trace(D_DLMTRACE, env, "unuse lock", lock);
1294
1295         if (lock->cll_users > 1) {
1296                 cl_lock_user_del(env, lock);
1297                 return 0;
1298         }
1299
1300         /* Only if the lock is in CLS_HELD or CLS_ENQUEUED state, it can hold
1301          * underlying resources. */
1302         if (!(lock->cll_state == CLS_HELD || lock->cll_state == CLS_ENQUEUED)) {
1303                 cl_lock_user_del(env, lock);
1304                 return 0;
1305         }
1306
1307         /*
1308          * New lock users (->cll_users) are not protecting unlocking
1309          * from proceeding. From this point, lock eventually reaches
1310          * CLS_CACHED, is reinitialized to CLS_NEW or fails into
1311          * CLS_FREEING.
1312          */
1313         state = cl_lock_intransit(env, lock);
1314
1315         result = cl_unuse_try_internal(env, lock);
1316         LASSERT(lock->cll_state == CLS_INTRANSIT);
1317         LASSERT(result != CLO_WAIT);
1318         cl_lock_user_del(env, lock);
1319         if (result == 0 || result == -ESTALE) {
1320                 /*
1321                  * Return lock back to the cache. This is the only
1322                  * place where lock is moved into CLS_CACHED state.
1323                  *
1324                  * If one of ->clo_unuse() methods returned -ESTALE, lock
1325                  * cannot be placed into cache and has to be
1326                  * re-initialized. This happens e.g., when a sub-lock was
1327                  * canceled while unlocking was in progress.
1328                  */
1329                 if (state == CLS_HELD && result == 0)
1330                         state = CLS_CACHED;
1331                 else
1332                         state = CLS_NEW;
1333                 cl_lock_extransit(env, lock, state);
1334
1335                 /*
1336                  * Hide -ESTALE error.
1337                  * If the lock is a glimpse lock, and it has multiple
1338                  * stripes. Assuming that one of its sublock returned -ENAVAIL,
1339                  * and other sublocks are matched write locks. In this case,
1340                  * we can't set this lock to error because otherwise some of
1341                  * its sublocks may not be canceled. This causes some dirty
1342                  * pages won't be written to OSTs. -jay
1343                  */
1344                 result = 0;
1345         } else {
1346                 CERROR("result = %d, this is unlikely!\n", result);
1347                 state = CLS_NEW;
1348                 cl_lock_extransit(env, lock, state);
1349         }
1350         return result ?: lock->cll_error;
1351 }
1352 EXPORT_SYMBOL(cl_unuse_try);
1353
1354 static void cl_unuse_locked(const struct lu_env *env, struct cl_lock *lock)
1355 {
1356         int result;
1357
1358         result = cl_unuse_try(env, lock);
1359         if (result)
1360                 CL_LOCK_DEBUG(D_ERROR, env, lock, "unuse return %d\n", result);
1361 }
1362
1363 /**
1364  * Unlocks a lock.
1365  */
1366 void cl_unuse(const struct lu_env *env, struct cl_lock *lock)
1367 {
1368         cl_lock_mutex_get(env, lock);
1369         cl_unuse_locked(env, lock);
1370         cl_lock_mutex_put(env, lock);
1371         cl_lock_lockdep_release(env, lock);
1372 }
1373 EXPORT_SYMBOL(cl_unuse);
1374
1375 /**
1376  * Tries to wait for a lock.
1377  *
1378  * This function is called repeatedly by cl_wait() until either lock is
1379  * granted, or error occurs. This function does not block waiting for network
1380  * communication to complete.
1381  *
1382  * \see cl_wait() cl_lock_operations::clo_wait()
1383  * \see cl_lock_state::CLS_HELD
1384  */
1385 int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
1386 {
1387         const struct cl_lock_slice *slice;
1388         int                      result;
1389
1390         cl_lock_trace(D_DLMTRACE, env, "wait lock try", lock);
1391         do {
1392                 LINVRNT(cl_lock_is_mutexed(lock));
1393                 LINVRNT(cl_lock_invariant(env, lock));
1394                 LASSERTF(lock->cll_state == CLS_QUEUING ||
1395                          lock->cll_state == CLS_ENQUEUED ||
1396                          lock->cll_state == CLS_HELD ||
1397                          lock->cll_state == CLS_INTRANSIT,
1398                          "lock state: %d\n", lock->cll_state);
1399                 LASSERT(lock->cll_users > 0);
1400                 LASSERT(lock->cll_holds > 0);
1401
1402                 result = lock->cll_error;
1403                 if (result != 0)
1404                         break;
1405
1406                 if (cl_lock_is_intransit(lock)) {
1407                         result = CLO_WAIT;
1408                         break;
1409                 }
1410
1411                 if (lock->cll_state == CLS_HELD)
1412                         /* nothing to do */
1413                         break;
1414
1415                 result = -ENOSYS;
1416                 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1417                         if (slice->cls_ops->clo_wait != NULL) {
1418                                 result = slice->cls_ops->clo_wait(env, slice);
1419                                 if (result != 0)
1420                                         break;
1421                         }
1422                 }
1423                 LASSERT(result != -ENOSYS);
1424                 if (result == 0) {
1425                         LASSERT(lock->cll_state != CLS_INTRANSIT);
1426                         cl_lock_state_set(env, lock, CLS_HELD);
1427                 }
1428         } while (result == CLO_REPEAT);
1429         return result;
1430 }
1431 EXPORT_SYMBOL(cl_wait_try);
1432
1433 /**
1434  * Waits until enqueued lock is granted.
1435  *
1436  * \pre current thread or io owns a hold on the lock
1437  * \pre ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1438  *                      lock->cll_state == CLS_HELD)
1439  *
1440  * \post ergo(result == 0, lock->cll_state == CLS_HELD)
1441  */
1442 int cl_wait(const struct lu_env *env, struct cl_lock *lock)
1443 {
1444         int result;
1445
1446         cl_lock_mutex_get(env, lock);
1447
1448         LINVRNT(cl_lock_invariant(env, lock));
1449         LASSERTF(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD,
1450                  "Wrong state %d \n", lock->cll_state);
1451         LASSERT(lock->cll_holds > 0);
1452
1453         do {
1454                 result = cl_wait_try(env, lock);
1455                 if (result == CLO_WAIT) {
1456                         result = cl_lock_state_wait(env, lock);
1457                         if (result == 0)
1458                                 continue;
1459                 }
1460                 break;
1461         } while (1);
1462         if (result < 0) {
1463                 cl_unuse_try(env, lock);
1464                 cl_lock_lockdep_release(env, lock);
1465         }
1466         cl_lock_trace(D_DLMTRACE, env, "wait lock", lock);
1467         cl_lock_mutex_put(env, lock);
1468         LASSERT(ergo(result == 0, lock->cll_state == CLS_HELD));
1469         return result;
1470 }
1471 EXPORT_SYMBOL(cl_wait);
1472
1473 /**
1474  * Executes cl_lock_operations::clo_weigh(), and sums results to estimate lock
1475  * value.
1476  */
1477 unsigned long cl_lock_weigh(const struct lu_env *env, struct cl_lock *lock)
1478 {
1479         const struct cl_lock_slice *slice;
1480         unsigned long pound;
1481         unsigned long ounce;
1482
1483         LINVRNT(cl_lock_is_mutexed(lock));
1484         LINVRNT(cl_lock_invariant(env, lock));
1485
1486         pound = 0;
1487         list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
1488                 if (slice->cls_ops->clo_weigh != NULL) {
1489                         ounce = slice->cls_ops->clo_weigh(env, slice);
1490                         pound += ounce;
1491                         if (pound < ounce) /* over-weight^Wflow */
1492                                 pound = ~0UL;
1493                 }
1494         }
1495         return pound;
1496 }
1497 EXPORT_SYMBOL(cl_lock_weigh);
1498
1499 /**
1500  * Notifies layers that lock description changed.
1501  *
1502  * The server can grant client a lock different from one that was requested
1503  * (e.g., larger in extent). This method is called when actually granted lock
1504  * description becomes known to let layers to accommodate for changed lock
1505  * description.
1506  *
1507  * \see cl_lock_operations::clo_modify()
1508  */
1509 int cl_lock_modify(const struct lu_env *env, struct cl_lock *lock,
1510                    const struct cl_lock_descr *desc)
1511 {
1512         const struct cl_lock_slice *slice;
1513         struct cl_object           *obj = lock->cll_descr.cld_obj;
1514         struct cl_object_header    *hdr = cl_object_header(obj);
1515         int result;
1516
1517         cl_lock_trace(D_DLMTRACE, env, "modify lock", lock);
1518         /* don't allow object to change */
1519         LASSERT(obj == desc->cld_obj);
1520         LINVRNT(cl_lock_is_mutexed(lock));
1521         LINVRNT(cl_lock_invariant(env, lock));
1522
1523         list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
1524                 if (slice->cls_ops->clo_modify != NULL) {
1525                         result = slice->cls_ops->clo_modify(env, slice, desc);
1526                         if (result != 0)
1527                                 return result;
1528                 }
1529         }
1530         CL_LOCK_DEBUG(D_DLMTRACE, env, lock, " -> "DDESCR"@"DFID"\n",
1531                       PDESCR(desc), PFID(lu_object_fid(&desc->cld_obj->co_lu)));
1532         /*
1533          * Just replace description in place. Nothing more is needed for
1534          * now. If locks were indexed according to their extent and/or mode,
1535          * that index would have to be updated here.
1536          */
1537         spin_lock(&hdr->coh_lock_guard);
1538         lock->cll_descr = *desc;
1539         spin_unlock(&hdr->coh_lock_guard);
1540         return 0;
1541 }
1542 EXPORT_SYMBOL(cl_lock_modify);
1543
1544 /**
1545  * Initializes lock closure with a given origin.
1546  *
1547  * \see cl_lock_closure
1548  */
1549 void cl_lock_closure_init(const struct lu_env *env,
1550                           struct cl_lock_closure *closure,
1551                           struct cl_lock *origin, int wait)
1552 {
1553         LINVRNT(cl_lock_is_mutexed(origin));
1554         LINVRNT(cl_lock_invariant(env, origin));
1555
1556         INIT_LIST_HEAD(&closure->clc_list);
1557         closure->clc_origin = origin;
1558         closure->clc_wait   = wait;
1559         closure->clc_nr     = 0;
1560 }
1561 EXPORT_SYMBOL(cl_lock_closure_init);
1562
1563 /**
1564  * Builds a closure of \a lock.
1565  *
1566  * Building of a closure consists of adding initial lock (\a lock) into it,
1567  * and calling cl_lock_operations::clo_closure() methods of \a lock. These
1568  * methods might call cl_lock_closure_build() recursively again, adding more
1569  * locks to the closure, etc.
1570  *
1571  * \see cl_lock_closure
1572  */
1573 int cl_lock_closure_build(const struct lu_env *env, struct cl_lock *lock,
1574                           struct cl_lock_closure *closure)
1575 {
1576         const struct cl_lock_slice *slice;
1577         int result;
1578
1579         LINVRNT(cl_lock_is_mutexed(closure->clc_origin));
1580         LINVRNT(cl_lock_invariant(env, closure->clc_origin));
1581
1582         result = cl_lock_enclosure(env, lock, closure);
1583         if (result == 0) {
1584                 list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1585                         if (slice->cls_ops->clo_closure != NULL) {
1586                                 result = slice->cls_ops->clo_closure(env, slice,
1587                                                                      closure);
1588                                 if (result != 0)
1589                                         break;
1590                         }
1591                 }
1592         }
1593         if (result != 0)
1594                 cl_lock_disclosure(env, closure);
1595         return result;
1596 }
1597 EXPORT_SYMBOL(cl_lock_closure_build);
1598
1599 /**
1600  * Adds new lock to a closure.
1601  *
1602  * Try-locks \a lock and if succeeded, adds it to the closure (never more than
1603  * once). If try-lock failed, returns CLO_REPEAT, after optionally waiting
1604  * until next try-lock is likely to succeed.
1605  */
1606 int cl_lock_enclosure(const struct lu_env *env, struct cl_lock *lock,
1607                       struct cl_lock_closure *closure)
1608 {
1609         int result = 0;
1610
1611         cl_lock_trace(D_DLMTRACE, env, "enclosure lock", lock);
1612         if (!cl_lock_mutex_try(env, lock)) {
1613                 /*
1614                  * If lock->cll_inclosure is not empty, lock is already in
1615                  * this closure.
1616                  */
1617                 if (list_empty(&lock->cll_inclosure)) {
1618                         cl_lock_get_trust(lock);
1619                         lu_ref_add(&lock->cll_reference, "closure", closure);
1620                         list_add(&lock->cll_inclosure, &closure->clc_list);
1621                         closure->clc_nr++;
1622                 } else
1623                         cl_lock_mutex_put(env, lock);
1624                 result = 0;
1625         } else {
1626                 cl_lock_disclosure(env, closure);
1627                 if (closure->clc_wait) {
1628                         cl_lock_get_trust(lock);
1629                         lu_ref_add(&lock->cll_reference, "closure-w", closure);
1630                         cl_lock_mutex_put(env, closure->clc_origin);
1631
1632                         LASSERT(cl_lock_nr_mutexed(env) == 0);
1633                         cl_lock_mutex_get(env, lock);
1634                         cl_lock_mutex_put(env, lock);
1635
1636                         cl_lock_mutex_get(env, closure->clc_origin);
1637                         lu_ref_del(&lock->cll_reference, "closure-w", closure);
1638                         cl_lock_put(env, lock);
1639                 }
1640                 result = CLO_REPEAT;
1641         }
1642         return result;
1643 }
1644 EXPORT_SYMBOL(cl_lock_enclosure);
1645
1646 /** Releases mutices of enclosed locks. */
1647 void cl_lock_disclosure(const struct lu_env *env,
1648                         struct cl_lock_closure *closure)
1649 {
1650         struct cl_lock *scan;
1651         struct cl_lock *temp;
1652
1653         cl_lock_trace(D_DLMTRACE, env, "disclosure lock", closure->clc_origin);
1654         list_for_each_entry_safe(scan, temp, &closure->clc_list,
1655                                      cll_inclosure){
1656                 list_del_init(&scan->cll_inclosure);
1657                 cl_lock_mutex_put(env, scan);
1658                 lu_ref_del(&scan->cll_reference, "closure", closure);
1659                 cl_lock_put(env, scan);
1660                 closure->clc_nr--;
1661         }
1662         LASSERT(closure->clc_nr == 0);
1663 }
1664 EXPORT_SYMBOL(cl_lock_disclosure);
1665
1666 /** Finalizes a closure. */
1667 void cl_lock_closure_fini(struct cl_lock_closure *closure)
1668 {
1669         LASSERT(closure->clc_nr == 0);
1670         LASSERT(list_empty(&closure->clc_list));
1671 }
1672 EXPORT_SYMBOL(cl_lock_closure_fini);
1673
1674 /**
1675  * Destroys this lock. Notifies layers (bottom-to-top) that lock is being
1676  * destroyed, then destroy the lock. If there are holds on the lock, postpone
1677  * destruction until all holds are released. This is called when a decision is
1678  * made to destroy the lock in the future. E.g., when a blocking AST is
1679  * received on it, or fatal communication error happens.
1680  *
1681  * Caller must have a reference on this lock to prevent a situation, when
1682  * deleted lock lingers in memory for indefinite time, because nobody calls
1683  * cl_lock_put() to finish it.
1684  *
1685  * \pre atomic_read(&lock->cll_ref) > 0
1686  * \pre ergo(cl_lock_nesting(lock) == CNL_TOP,
1687  *         cl_lock_nr_mutexed(env) == 1)
1688  *      [i.e., if a top-lock is deleted, mutices of no other locks can be
1689  *      held, as deletion of sub-locks might require releasing a top-lock
1690  *      mutex]
1691  *
1692  * \see cl_lock_operations::clo_delete()
1693  * \see cl_lock::cll_holds
1694  */
1695 void cl_lock_delete(const struct lu_env *env, struct cl_lock *lock)
1696 {
1697         LINVRNT(cl_lock_is_mutexed(lock));
1698         LINVRNT(cl_lock_invariant(env, lock));
1699         LASSERT(ergo(cl_lock_nesting(lock) == CNL_TOP,
1700                      cl_lock_nr_mutexed(env) == 1));
1701
1702         cl_lock_trace(D_DLMTRACE, env, "delete lock", lock);
1703         if (lock->cll_holds == 0)
1704                 cl_lock_delete0(env, lock);
1705         else
1706                 lock->cll_flags |= CLF_DOOMED;
1707 }
1708 EXPORT_SYMBOL(cl_lock_delete);
1709
1710 /**
1711  * Mark lock as irrecoverably failed, and mark it for destruction. This
1712  * happens when, e.g., server fails to grant a lock to us, or networking
1713  * time-out happens.
1714  *
1715  * \pre atomic_read(&lock->cll_ref) > 0
1716  *
1717  * \see clo_lock_delete()
1718  * \see cl_lock::cll_holds
1719  */
1720 void cl_lock_error(const struct lu_env *env, struct cl_lock *lock, int error)
1721 {
1722         LINVRNT(cl_lock_is_mutexed(lock));
1723         LINVRNT(cl_lock_invariant(env, lock));
1724
1725         if (lock->cll_error == 0 && error != 0) {
1726                 cl_lock_trace(D_DLMTRACE, env, "set lock error", lock);
1727                 lock->cll_error = error;
1728                 cl_lock_signal(env, lock);
1729                 cl_lock_cancel(env, lock);
1730                 cl_lock_delete(env, lock);
1731         }
1732 }
1733 EXPORT_SYMBOL(cl_lock_error);
1734
1735 /**
1736  * Cancels this lock. Notifies layers
1737  * (bottom-to-top) that lock is being cancelled, then destroy the lock. If
1738  * there are holds on the lock, postpone cancellation until
1739  * all holds are released.
1740  *
1741  * Cancellation notification is delivered to layers at most once.
1742  *
1743  * \see cl_lock_operations::clo_cancel()
1744  * \see cl_lock::cll_holds
1745  */
1746 void cl_lock_cancel(const struct lu_env *env, struct cl_lock *lock)
1747 {
1748         LINVRNT(cl_lock_is_mutexed(lock));
1749         LINVRNT(cl_lock_invariant(env, lock));
1750
1751         cl_lock_trace(D_DLMTRACE, env, "cancel lock", lock);
1752         if (lock->cll_holds == 0)
1753                 cl_lock_cancel0(env, lock);
1754         else
1755                 lock->cll_flags |= CLF_CANCELPEND;
1756 }
1757 EXPORT_SYMBOL(cl_lock_cancel);
1758
1759 /**
1760  * Finds an existing lock covering given index and optionally different from a
1761  * given \a except lock.
1762  */
1763 struct cl_lock *cl_lock_at_pgoff(const struct lu_env *env,
1764                                  struct cl_object *obj, pgoff_t index,
1765                                  struct cl_lock *except,
1766                                  int pending, int canceld)
1767 {
1768         struct cl_object_header *head;
1769         struct cl_lock    *scan;
1770         struct cl_lock    *lock;
1771         struct cl_lock_descr    *need;
1772
1773         head = cl_object_header(obj);
1774         need = &cl_env_info(env)->clt_descr;
1775         lock = NULL;
1776
1777         need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but
1778                                     * not PHANTOM */
1779         need->cld_start = need->cld_end = index;
1780         need->cld_enq_flags = 0;
1781
1782         spin_lock(&head->coh_lock_guard);
1783         /* It is fine to match any group lock since there could be only one
1784          * with a uniq gid and it conflicts with all other lock modes too */
1785         list_for_each_entry(scan, &head->coh_locks, cll_linkage) {
1786                 if (scan != except &&
1787                     (scan->cll_descr.cld_mode == CLM_GROUP ||
1788                     cl_lock_ext_match(&scan->cll_descr, need)) &&
1789                     scan->cll_state >= CLS_HELD &&
1790                     scan->cll_state < CLS_FREEING &&
1791                     /*
1792                      * This check is racy as the lock can be canceled right
1793                      * after it is done, but this is fine, because page exists
1794                      * already.
1795                      */
1796                     (canceld || !(scan->cll_flags & CLF_CANCELLED)) &&
1797                     (pending || !(scan->cll_flags & CLF_CANCELPEND))) {
1798                         /* Don't increase cs_hit here since this
1799                          * is just a helper function. */
1800                         cl_lock_get_trust(scan);
1801                         lock = scan;
1802                         break;
1803                 }
1804         }
1805         spin_unlock(&head->coh_lock_guard);
1806         return lock;
1807 }
1808 EXPORT_SYMBOL(cl_lock_at_pgoff);
1809
1810 /**
1811  * Calculate the page offset at the layer of @lock.
1812  * At the time of this writing, @page is top page and @lock is sub lock.
1813  */
1814 static pgoff_t pgoff_at_lock(struct cl_page *page, struct cl_lock *lock)
1815 {
1816         struct lu_device_type *dtype;
1817         const struct cl_page_slice *slice;
1818
1819         dtype = lock->cll_descr.cld_obj->co_lu.lo_dev->ld_type;
1820         slice = cl_page_at(page, dtype);
1821         LASSERT(slice != NULL);
1822         return slice->cpl_page->cp_index;
1823 }
1824
1825 /**
1826  * Check if page @page is covered by an extra lock or discard it.
1827  */
1828 static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
1829                                 struct cl_page *page, void *cbdata)
1830 {
1831         struct cl_thread_info *info = cl_env_info(env);
1832         struct cl_lock *lock = cbdata;
1833         pgoff_t index = pgoff_at_lock(page, lock);
1834
1835         if (index >= info->clt_fn_index) {
1836                 struct cl_lock *tmp;
1837
1838                 /* refresh non-overlapped index */
1839                 tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index,
1840                                         lock, 1, 0);
1841                 if (tmp != NULL) {
1842                         /* Cache the first-non-overlapped index so as to skip
1843                          * all pages within [index, clt_fn_index). This
1844                          * is safe because if tmp lock is canceled, it will
1845                          * discard these pages. */
1846                         info->clt_fn_index = tmp->cll_descr.cld_end + 1;
1847                         if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
1848                                 info->clt_fn_index = CL_PAGE_EOF;
1849                         cl_lock_put(env, tmp);
1850                 } else if (cl_page_own(env, io, page) == 0) {
1851                         /* discard the page */
1852                         cl_page_unmap(env, io, page);
1853                         cl_page_discard(env, io, page);
1854                         cl_page_disown(env, io, page);
1855                 } else {
1856                         LASSERT(page->cp_state == CPS_FREEING);
1857                 }
1858         }
1859
1860         info->clt_next_index = index + 1;
1861         return CLP_GANG_OKAY;
1862 }
1863
1864 static int discard_cb(const struct lu_env *env, struct cl_io *io,
1865                       struct cl_page *page, void *cbdata)
1866 {
1867         struct cl_thread_info *info = cl_env_info(env);
1868         struct cl_lock *lock   = cbdata;
1869
1870         LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
1871         KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
1872                       !PageWriteback(cl_page_vmpage(env, page))));
1873         KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
1874                       !PageDirty(cl_page_vmpage(env, page))));
1875
1876         info->clt_next_index = pgoff_at_lock(page, lock) + 1;
1877         if (cl_page_own(env, io, page) == 0) {
1878                 /* discard the page */
1879                 cl_page_unmap(env, io, page);
1880                 cl_page_discard(env, io, page);
1881                 cl_page_disown(env, io, page);
1882         } else {
1883                 LASSERT(page->cp_state == CPS_FREEING);
1884         }
1885
1886         return CLP_GANG_OKAY;
1887 }
1888
1889 /**
1890  * Discard pages protected by the given lock. This function traverses radix
1891  * tree to find all covering pages and discard them. If a page is being covered
1892  * by other locks, it should remain in cache.
1893  *
1894  * If error happens on any step, the process continues anyway (the reasoning
1895  * behind this being that lock cancellation cannot be delayed indefinitely).
1896  */
1897 int cl_lock_discard_pages(const struct lu_env *env, struct cl_lock *lock)
1898 {
1899         struct cl_thread_info *info  = cl_env_info(env);
1900         struct cl_io      *io    = &info->clt_io;
1901         struct cl_lock_descr  *descr = &lock->cll_descr;
1902         cl_page_gang_cb_t      cb;
1903         int res;
1904         int result;
1905
1906         LINVRNT(cl_lock_invariant(env, lock));
1907
1908         io->ci_obj = cl_object_top(descr->cld_obj);
1909         io->ci_ignore_layout = 1;
1910         result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
1911         if (result != 0)
1912                 goto out;
1913
1914         cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb;
1915         info->clt_fn_index = info->clt_next_index = descr->cld_start;
1916         do {
1917                 res = cl_page_gang_lookup(env, descr->cld_obj, io,
1918                                           info->clt_next_index, descr->cld_end,
1919                                           cb, (void *)lock);
1920                 if (info->clt_next_index > descr->cld_end)
1921                         break;
1922
1923                 if (res == CLP_GANG_RESCHED)
1924                         cond_resched();
1925         } while (res != CLP_GANG_OKAY);
1926 out:
1927         cl_io_fini(env, io);
1928         return result;
1929 }
1930 EXPORT_SYMBOL(cl_lock_discard_pages);
1931
1932 /**
1933  * Eliminate all locks for a given object.
1934  *
1935  * Caller has to guarantee that no lock is in active use.
1936  *
1937  * \param cancel when this is set, cl_locks_prune() cancels locks before
1938  *             destroying.
1939  */
1940 void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel)
1941 {
1942         struct cl_object_header *head;
1943         struct cl_lock    *lock;
1944
1945         head = cl_object_header(obj);
1946         /*
1947          * If locks are destroyed without cancellation, all pages must be
1948          * already destroyed (as otherwise they will be left unprotected).
1949          */
1950         LASSERT(ergo(!cancel,
1951                      head->coh_tree.rnode == NULL && head->coh_pages == 0));
1952
1953         spin_lock(&head->coh_lock_guard);
1954         while (!list_empty(&head->coh_locks)) {
1955                 lock = container_of(head->coh_locks.next,
1956                                     struct cl_lock, cll_linkage);
1957                 cl_lock_get_trust(lock);
1958                 spin_unlock(&head->coh_lock_guard);
1959                 lu_ref_add(&lock->cll_reference, "prune", current);
1960
1961 again:
1962                 cl_lock_mutex_get(env, lock);
1963                 if (lock->cll_state < CLS_FREEING) {
1964                         LASSERT(lock->cll_users <= 1);
1965                         if (unlikely(lock->cll_users == 1)) {
1966                                 struct l_wait_info lwi = { 0 };
1967
1968                                 cl_lock_mutex_put(env, lock);
1969                                 l_wait_event(lock->cll_wq,
1970                                              lock->cll_users == 0,
1971                                              &lwi);
1972                                 goto again;
1973                         }
1974
1975                         if (cancel)
1976                                 cl_lock_cancel(env, lock);
1977                         cl_lock_delete(env, lock);
1978                 }
1979                 cl_lock_mutex_put(env, lock);
1980                 lu_ref_del(&lock->cll_reference, "prune", current);
1981                 cl_lock_put(env, lock);
1982                 spin_lock(&head->coh_lock_guard);
1983         }
1984         spin_unlock(&head->coh_lock_guard);
1985 }
1986 EXPORT_SYMBOL(cl_locks_prune);
1987
1988 static struct cl_lock *cl_lock_hold_mutex(const struct lu_env *env,
1989                                           const struct cl_io *io,
1990                                           const struct cl_lock_descr *need,
1991                                           const char *scope, const void *source)
1992 {
1993         struct cl_lock *lock;
1994
1995         while (1) {
1996                 lock = cl_lock_find(env, io, need);
1997                 if (IS_ERR(lock))
1998                         break;
1999                 cl_lock_mutex_get(env, lock);
2000                 if (lock->cll_state < CLS_FREEING &&
2001                     !(lock->cll_flags & CLF_CANCELLED)) {
2002                         cl_lock_hold_mod(env, lock, 1);
2003                         lu_ref_add(&lock->cll_holders, scope, source);
2004                         lu_ref_add(&lock->cll_reference, scope, source);
2005                         break;
2006                 }
2007                 cl_lock_mutex_put(env, lock);
2008                 cl_lock_put(env, lock);
2009         }
2010         return lock;
2011 }
2012
2013 /**
2014  * Returns a lock matching \a need description with a reference and a hold on
2015  * it.
2016  *
2017  * This is much like cl_lock_find(), except that cl_lock_hold() additionally
2018  * guarantees that lock is not in the CLS_FREEING state on return.
2019  */
2020 struct cl_lock *cl_lock_hold(const struct lu_env *env, const struct cl_io *io,
2021                              const struct cl_lock_descr *need,
2022                              const char *scope, const void *source)
2023 {
2024         struct cl_lock *lock;
2025
2026         lock = cl_lock_hold_mutex(env, io, need, scope, source);
2027         if (!IS_ERR(lock))
2028                 cl_lock_mutex_put(env, lock);
2029         return lock;
2030 }
2031 EXPORT_SYMBOL(cl_lock_hold);
2032
2033 /**
2034  * Main high-level entry point of cl_lock interface that finds existing or
2035  * enqueues new lock matching given description.
2036  */
2037 struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
2038                                 const struct cl_lock_descr *need,
2039                                 const char *scope, const void *source)
2040 {
2041         struct cl_lock       *lock;
2042         int                rc;
2043         __u32            enqflags = need->cld_enq_flags;
2044
2045         do {
2046                 lock = cl_lock_hold_mutex(env, io, need, scope, source);
2047                 if (IS_ERR(lock))
2048                         break;
2049
2050                 rc = cl_enqueue_locked(env, lock, io, enqflags);
2051                 if (rc == 0) {
2052                         if (cl_lock_fits_into(env, lock, need, io)) {
2053                                 if (!(enqflags & CEF_AGL)) {
2054                                         cl_lock_mutex_put(env, lock);
2055                                         cl_lock_lockdep_acquire(env, lock,
2056                                                                 enqflags);
2057                                         break;
2058                                 }
2059                                 rc = 1;
2060                         }
2061                         cl_unuse_locked(env, lock);
2062                 }
2063                 cl_lock_trace(D_DLMTRACE, env,
2064                               rc <= 0 ? "enqueue failed" : "agl succeed", lock);
2065                 cl_lock_hold_release(env, lock, scope, source);
2066                 cl_lock_mutex_put(env, lock);
2067                 lu_ref_del(&lock->cll_reference, scope, source);
2068                 cl_lock_put(env, lock);
2069                 if (rc > 0) {
2070                         LASSERT(enqflags & CEF_AGL);
2071                         lock = NULL;
2072                 } else if (rc != 0) {
2073                         lock = ERR_PTR(rc);
2074                 }
2075         } while (rc == 0);
2076         return lock;
2077 }
2078 EXPORT_SYMBOL(cl_lock_request);
2079
2080 /**
2081  * Adds a hold to a known lock.
2082  */
2083 void cl_lock_hold_add(const struct lu_env *env, struct cl_lock *lock,
2084                       const char *scope, const void *source)
2085 {
2086         LINVRNT(cl_lock_is_mutexed(lock));
2087         LINVRNT(cl_lock_invariant(env, lock));
2088         LASSERT(lock->cll_state != CLS_FREEING);
2089
2090         cl_lock_hold_mod(env, lock, 1);
2091         cl_lock_get(lock);
2092         lu_ref_add(&lock->cll_holders, scope, source);
2093         lu_ref_add(&lock->cll_reference, scope, source);
2094 }
2095 EXPORT_SYMBOL(cl_lock_hold_add);
2096
2097 /**
2098  * Releases a hold and a reference on a lock, on which caller acquired a
2099  * mutex.
2100  */
2101 void cl_lock_unhold(const struct lu_env *env, struct cl_lock *lock,
2102                     const char *scope, const void *source)
2103 {
2104         LINVRNT(cl_lock_invariant(env, lock));
2105         cl_lock_hold_release(env, lock, scope, source);
2106         lu_ref_del(&lock->cll_reference, scope, source);
2107         cl_lock_put(env, lock);
2108 }
2109 EXPORT_SYMBOL(cl_lock_unhold);
2110
2111 /**
2112  * Releases a hold and a reference on a lock, obtained by cl_lock_hold().
2113  */
2114 void cl_lock_release(const struct lu_env *env, struct cl_lock *lock,
2115                      const char *scope, const void *source)
2116 {
2117         LINVRNT(cl_lock_invariant(env, lock));
2118         cl_lock_trace(D_DLMTRACE, env, "release lock", lock);
2119         cl_lock_mutex_get(env, lock);
2120         cl_lock_hold_release(env, lock, scope, source);
2121         cl_lock_mutex_put(env, lock);
2122         lu_ref_del(&lock->cll_reference, scope, source);
2123         cl_lock_put(env, lock);
2124 }
2125 EXPORT_SYMBOL(cl_lock_release);
2126
2127 void cl_lock_user_add(const struct lu_env *env, struct cl_lock *lock)
2128 {
2129         LINVRNT(cl_lock_is_mutexed(lock));
2130         LINVRNT(cl_lock_invariant(env, lock));
2131
2132         cl_lock_used_mod(env, lock, 1);
2133 }
2134 EXPORT_SYMBOL(cl_lock_user_add);
2135
2136 void cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock)
2137 {
2138         LINVRNT(cl_lock_is_mutexed(lock));
2139         LINVRNT(cl_lock_invariant(env, lock));
2140         LASSERT(lock->cll_users > 0);
2141
2142         cl_lock_used_mod(env, lock, -1);
2143         if (lock->cll_users == 0)
2144                 wake_up_all(&lock->cll_wq);
2145 }
2146 EXPORT_SYMBOL(cl_lock_user_del);
2147
2148 const char *cl_lock_mode_name(const enum cl_lock_mode mode)
2149 {
2150         static const char *names[] = {
2151                 [CLM_PHANTOM] = "P",
2152                 [CLM_READ]    = "R",
2153                 [CLM_WRITE]   = "W",
2154                 [CLM_GROUP]   = "G"
2155         };
2156         if (0 <= mode && mode < ARRAY_SIZE(names))
2157                 return names[mode];
2158         else
2159                 return "U";
2160 }
2161 EXPORT_SYMBOL(cl_lock_mode_name);
2162
2163 /**
2164  * Prints human readable representation of a lock description.
2165  */
2166 void cl_lock_descr_print(const struct lu_env *env, void *cookie,
2167                        lu_printer_t printer,
2168                        const struct cl_lock_descr *descr)
2169 {
2170         const struct lu_fid  *fid;
2171
2172         fid = lu_object_fid(&descr->cld_obj->co_lu);
2173         (*printer)(env, cookie, DDESCR"@"DFID, PDESCR(descr), PFID(fid));
2174 }
2175 EXPORT_SYMBOL(cl_lock_descr_print);
2176
2177 /**
2178  * Prints human readable representation of \a lock to the \a f.
2179  */
2180 void cl_lock_print(const struct lu_env *env, void *cookie,
2181                    lu_printer_t printer, const struct cl_lock *lock)
2182 {
2183         const struct cl_lock_slice *slice;
2184         (*printer)(env, cookie, "lock@%p[%d %d %d %d %d %08lx] ",
2185                    lock, atomic_read(&lock->cll_ref),
2186                    lock->cll_state, lock->cll_error, lock->cll_holds,
2187                    lock->cll_users, lock->cll_flags);
2188         cl_lock_descr_print(env, cookie, printer, &lock->cll_descr);
2189         (*printer)(env, cookie, " {\n");
2190
2191         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
2192                 (*printer)(env, cookie, "    %s@%p: ",
2193                            slice->cls_obj->co_lu.lo_dev->ld_type->ldt_name,
2194                            slice);
2195                 if (slice->cls_ops->clo_print != NULL)
2196                         slice->cls_ops->clo_print(env, cookie, printer, slice);
2197                 (*printer)(env, cookie, "\n");
2198         }
2199         (*printer)(env, cookie, "} lock@%p\n", lock);
2200 }
2201 EXPORT_SYMBOL(cl_lock_print);
2202
2203 int cl_lock_init(void)
2204 {
2205         return lu_kmem_init(cl_lock_caches);
2206 }
2207
2208 void cl_lock_fini(void)
2209 {
2210         lu_kmem_fini(cl_lock_caches);
2211 }