Merge branch 'yem/kconfig-for-next' of git://gitorious.org/linux-kconfig/linux-kconfi...
[firefly-linux-kernel-4.4.55.git] / drivers / staging / lustre / lustre / lmv / lmv_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LMV
38 #include <linux/slab.h>
39 #include <linux/module.h>
40 #include <linux/init.h>
41 #include <linux/pagemap.h>
42 #include <linux/mm.h>
43 #include <asm/div64.h>
44 #include <linux/seq_file.h>
45 #include <linux/namei.h>
46 #include <asm/uaccess.h>
47
48 #include <lustre/lustre_idl.h>
49 #include <obd_support.h>
50 #include <lustre_lib.h>
51 #include <lustre_net.h>
52 #include <obd_class.h>
53 #include <lprocfs_status.h>
54 #include <lustre_lite.h>
55 #include <lustre_fid.h>
56 #include "lmv_internal.h"
57
58 static void lmv_activate_target(struct lmv_obd *lmv,
59                                 struct lmv_tgt_desc *tgt,
60                                 int activate)
61 {
62         if (tgt->ltd_active == activate)
63                 return;
64
65         tgt->ltd_active = activate;
66         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
67 }
68
69 /**
70  * Error codes:
71  *
72  *  -EINVAL  : UUID can't be found in the LMV's target list
73  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
74  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
75  */
76 static int lmv_set_mdc_active(struct lmv_obd *lmv, struct obd_uuid *uuid,
77                               int activate)
78 {
79         struct lmv_tgt_desc    *uninitialized_var(tgt);
80         struct obd_device      *obd;
81         int                  i;
82         int                  rc = 0;
83
84         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
85                lmv, uuid->uuid, activate);
86
87         spin_lock(&lmv->lmv_lock);
88         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
89                 tgt = lmv->tgts[i];
90                 if (tgt == NULL || tgt->ltd_exp == NULL)
91                         continue;
92
93                 CDEBUG(D_INFO, "Target idx %d is %s conn "LPX64"\n", i,
94                        tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
95
96                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
97                         break;
98         }
99
100         if (i == lmv->desc.ld_tgt_count)
101                 GOTO(out_lmv_lock, rc = -EINVAL);
102
103         obd = class_exp2obd(tgt->ltd_exp);
104         if (obd == NULL)
105                 GOTO(out_lmv_lock, rc = -ENOTCONN);
106
107         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
108                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
109                obd->obd_type->typ_name, i);
110         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
111
112         if (tgt->ltd_active == activate) {
113                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
114                        activate ? "" : "in");
115                 GOTO(out_lmv_lock, rc);
116         }
117
118         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
119                activate ? "" : "in");
120         lmv_activate_target(lmv, tgt, activate);
121
122  out_lmv_lock:
123         spin_unlock(&lmv->lmv_lock);
124         return rc;
125 }
126
127 struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
128 {
129         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
130
131         return obd_get_uuid(lmv->tgts[0]->ltd_exp);
132 }
133
134 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
135                       enum obd_notify_event ev, void *data)
136 {
137         struct obd_connect_data *conn_data;
138         struct lmv_obd    *lmv = &obd->u.lmv;
139         struct obd_uuid  *uuid;
140         int                   rc = 0;
141
142         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
143                 CERROR("unexpected notification of %s %s!\n",
144                        watched->obd_type->typ_name,
145                        watched->obd_name);
146                 return -EINVAL;
147         }
148
149         uuid = &watched->u.cli.cl_target_uuid;
150         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
151                 /*
152                  * Set MDC as active before notifying the observer, so the
153                  * observer can use the MDC normally.
154                  */
155                 rc = lmv_set_mdc_active(lmv, uuid,
156                                         ev == OBD_NOTIFY_ACTIVE);
157                 if (rc) {
158                         CERROR("%sactivation of %s failed: %d\n",
159                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
160                                uuid->uuid, rc);
161                         return rc;
162                 }
163         } else if (ev == OBD_NOTIFY_OCD) {
164                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
165                 /*
166                  * XXX: Make sure that ocd_connect_flags from all targets are
167                  * the same. Otherwise one of MDTs runs wrong version or
168                  * something like this.  --umka
169                  */
170                 obd->obd_self_export->exp_connect_data = *conn_data;
171         }
172 #if 0
173         else if (ev == OBD_NOTIFY_DISCON) {
174                 /*
175                  * For disconnect event, flush fld cache for failout MDS case.
176                  */
177                 fld_client_flush(&lmv->lmv_fld);
178         }
179 #endif
180         /*
181          * Pass the notification up the chain.
182          */
183         if (obd->obd_observer)
184                 rc = obd_notify(obd->obd_observer, watched, ev, data);
185
186         return rc;
187 }
188
189 /**
190  * This is fake connect function. Its purpose is to initialize lmv and say
191  * caller that everything is okay. Real connection will be performed later.
192  */
193 static int lmv_connect(const struct lu_env *env,
194                        struct obd_export **exp, struct obd_device *obd,
195                        struct obd_uuid *cluuid, struct obd_connect_data *data,
196                        void *localdata)
197 {
198         struct proc_dir_entry *lmv_proc_dir;
199         struct lmv_obd  *lmv = &obd->u.lmv;
200         struct lustre_handle  conn = { 0 };
201         int                 rc = 0;
202
203         /*
204          * We don't want to actually do the underlying connections more than
205          * once, so keep track.
206          */
207         lmv->refcount++;
208         if (lmv->refcount > 1) {
209                 *exp = NULL;
210                 return 0;
211         }
212
213         rc = class_connect(&conn, obd, cluuid);
214         if (rc) {
215                 CERROR("class_connection() returned %d\n", rc);
216                 return rc;
217         }
218
219         *exp = class_conn2export(&conn);
220         class_export_get(*exp);
221
222         lmv->exp = *exp;
223         lmv->connected = 0;
224         lmv->cluuid = *cluuid;
225
226         if (data)
227                 lmv->conn_data = *data;
228
229         if (obd->obd_proc_private != NULL) {
230                 lmv_proc_dir = obd->obd_proc_private;
231         } else {
232                 lmv_proc_dir = lprocfs_register("target_obds", obd->obd_proc_entry,
233                                                 NULL, NULL);
234                 if (IS_ERR(lmv_proc_dir)) {
235                         CERROR("could not register /proc/fs/lustre/%s/%s/target_obds.",
236                                obd->obd_type->typ_name, obd->obd_name);
237                         lmv_proc_dir = NULL;
238                 }
239                 obd->obd_proc_private = lmv_proc_dir;
240         }
241
242         /*
243          * All real clients should perform actual connection right away, because
244          * it is possible, that LMV will not have opportunity to connect targets
245          * and MDC stuff will be called directly, for instance while reading
246          * ../mdc/../kbytesfree procfs file, etc.
247          */
248         if (data->ocd_connect_flags & OBD_CONNECT_REAL)
249                 rc = lmv_check_connect(obd);
250
251         if (rc && lmv_proc_dir) {
252                 lprocfs_remove(&lmv_proc_dir);
253                 obd->obd_proc_private = NULL;
254         }
255
256         return rc;
257 }
258
259 static void lmv_set_timeouts(struct obd_device *obd)
260 {
261         struct lmv_tgt_desc   *tgt;
262         struct lmv_obd  *lmv;
263         int                 i;
264
265         lmv = &obd->u.lmv;
266         if (lmv->server_timeout == 0)
267                 return;
268
269         if (lmv->connected == 0)
270                 return;
271
272         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
273                 tgt = lmv->tgts[i];
274                 if (tgt == NULL || tgt->ltd_exp == NULL || tgt->ltd_active == 0)
275                         continue;
276
277                 obd_set_info_async(NULL, tgt->ltd_exp, sizeof(KEY_INTERMDS),
278                                    KEY_INTERMDS, 0, NULL, NULL);
279         }
280 }
281
282 static int lmv_init_ea_size(struct obd_export *exp, int easize,
283                             int def_easize, int cookiesize)
284 {
285         struct obd_device   *obd = exp->exp_obd;
286         struct lmv_obd      *lmv = &obd->u.lmv;
287         int               i;
288         int               rc = 0;
289         int               change = 0;
290
291         if (lmv->max_easize < easize) {
292                 lmv->max_easize = easize;
293                 change = 1;
294         }
295         if (lmv->max_def_easize < def_easize) {
296                 lmv->max_def_easize = def_easize;
297                 change = 1;
298         }
299         if (lmv->max_cookiesize < cookiesize) {
300                 lmv->max_cookiesize = cookiesize;
301                 change = 1;
302         }
303         if (change == 0)
304                 return 0;
305
306         if (lmv->connected == 0)
307                 return 0;
308
309         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
310                 if (lmv->tgts[i] == NULL ||
311                     lmv->tgts[i]->ltd_exp == NULL ||
312                     lmv->tgts[i]->ltd_active == 0) {
313                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
314                         continue;
315                 }
316
317                 rc = md_init_ea_size(lmv->tgts[i]->ltd_exp, easize, def_easize,
318                                      cookiesize);
319                 if (rc) {
320                         CERROR("%s: obd_init_ea_size() failed on MDT target %d:"
321                                " rc = %d.\n", obd->obd_name, i, rc);
322                         break;
323                 }
324         }
325         return rc;
326 }
327
328 #define MAX_STRING_SIZE 128
329
330 int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
331 {
332         struct proc_dir_entry   *lmv_proc_dir;
333         struct lmv_obd    *lmv = &obd->u.lmv;
334         struct obd_uuid  *cluuid = &lmv->cluuid;
335         struct obd_uuid   lmv_mdc_uuid = { "LMV_MDC_UUID" };
336         struct obd_device       *mdc_obd;
337         struct obd_export       *mdc_exp;
338         struct lu_fld_target     target;
339         int                   rc;
340
341         mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
342                                         &obd->obd_uuid);
343         if (!mdc_obd) {
344                 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
345                 return -EINVAL;
346         }
347
348         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
349                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
350                 tgt->ltd_uuid.uuid, obd->obd_uuid.uuid,
351                 cluuid->uuid);
352
353         if (!mdc_obd->obd_set_up) {
354                 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
355                 return -EINVAL;
356         }
357
358         rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
359                          &lmv->conn_data, NULL);
360         if (rc) {
361                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
362                 return rc;
363         }
364
365         /*
366          * Init fid sequence client for this mdc and add new fld target.
367          */
368         rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
369         if (rc)
370                 return rc;
371
372         target.ft_srv = NULL;
373         target.ft_exp = mdc_exp;
374         target.ft_idx = tgt->ltd_idx;
375
376         fld_client_add_target(&lmv->lmv_fld, &target);
377
378         rc = obd_register_observer(mdc_obd, obd);
379         if (rc) {
380                 obd_disconnect(mdc_exp);
381                 CERROR("target %s register_observer error %d\n",
382                        tgt->ltd_uuid.uuid, rc);
383                 return rc;
384         }
385
386         if (obd->obd_observer) {
387                 /*
388                  * Tell the observer about the new target.
389                  */
390                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
391                                 OBD_NOTIFY_ACTIVE,
392                                 (void *)(tgt - lmv->tgts[0]));
393                 if (rc) {
394                         obd_disconnect(mdc_exp);
395                         return rc;
396                 }
397         }
398
399         tgt->ltd_active = 1;
400         tgt->ltd_exp = mdc_exp;
401         lmv->desc.ld_active_tgt_count++;
402
403         md_init_ea_size(tgt->ltd_exp, lmv->max_easize,
404                         lmv->max_def_easize, lmv->max_cookiesize);
405
406         CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
407                 mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
408                 atomic_read(&obd->obd_refcount));
409
410         lmv_proc_dir = obd->obd_proc_private;
411         if (lmv_proc_dir) {
412                 struct proc_dir_entry *mdc_symlink;
413
414                 LASSERT(mdc_obd->obd_type != NULL);
415                 LASSERT(mdc_obd->obd_type->typ_name != NULL);
416                 mdc_symlink = lprocfs_add_symlink(mdc_obd->obd_name,
417                                                   lmv_proc_dir,
418                                                   "../../../%s/%s",
419                                                   mdc_obd->obd_type->typ_name,
420                                                   mdc_obd->obd_name);
421                 if (mdc_symlink == NULL) {
422                         CERROR("Could not register LMV target "
423                                "/proc/fs/lustre/%s/%s/target_obds/%s.",
424                                obd->obd_type->typ_name, obd->obd_name,
425                                mdc_obd->obd_name);
426                         lprocfs_remove(&lmv_proc_dir);
427                         obd->obd_proc_private = NULL;
428                 }
429         }
430         return 0;
431 }
432
433 static void lmv_del_target(struct lmv_obd *lmv, int index)
434 {
435         if (lmv->tgts[index] == NULL)
436                 return;
437
438         OBD_FREE_PTR(lmv->tgts[index]);
439         lmv->tgts[index] = NULL;
440         return;
441 }
442
443 static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
444                            __u32 index, int gen)
445 {
446         struct lmv_obd      *lmv = &obd->u.lmv;
447         struct lmv_tgt_desc *tgt;
448         int               rc = 0;
449
450         CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
451
452         lmv_init_lock(lmv);
453
454         if (lmv->desc.ld_tgt_count == 0) {
455                 struct obd_device *mdc_obd;
456
457                 mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
458                                                 &obd->obd_uuid);
459                 if (!mdc_obd) {
460                         lmv_init_unlock(lmv);
461                         CERROR("%s: Target %s not attached: rc = %d\n",
462                                obd->obd_name, uuidp->uuid, -EINVAL);
463                         return -EINVAL;
464                 }
465         }
466
467         if ((index < lmv->tgts_size) && (lmv->tgts[index] != NULL)) {
468                 tgt = lmv->tgts[index];
469                 CERROR("%s: UUID %s already assigned at LOV target index %d:"
470                        " rc = %d\n", obd->obd_name,
471                        obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST);
472                 lmv_init_unlock(lmv);
473                 return -EEXIST;
474         }
475
476         if (index >= lmv->tgts_size) {
477                 /* We need to reallocate the lmv target array. */
478                 struct lmv_tgt_desc **newtgts, **old = NULL;
479                 __u32 newsize = 1;
480                 __u32 oldsize = 0;
481
482                 while (newsize < index + 1)
483                         newsize = newsize << 1;
484                 OBD_ALLOC(newtgts, sizeof(*newtgts) * newsize);
485                 if (newtgts == NULL) {
486                         lmv_init_unlock(lmv);
487                         return -ENOMEM;
488                 }
489
490                 if (lmv->tgts_size) {
491                         memcpy(newtgts, lmv->tgts,
492                                sizeof(*newtgts) * lmv->tgts_size);
493                         old = lmv->tgts;
494                         oldsize = lmv->tgts_size;
495                 }
496
497                 lmv->tgts = newtgts;
498                 lmv->tgts_size = newsize;
499                 smp_rmb();
500                 if (old)
501                         OBD_FREE(old, sizeof(*old) * oldsize);
502
503                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n", lmv->tgts,
504                        lmv->tgts_size);
505         }
506
507         OBD_ALLOC_PTR(tgt);
508         if (!tgt) {
509                 lmv_init_unlock(lmv);
510                 return -ENOMEM;
511         }
512
513         mutex_init(&tgt->ltd_fid_mutex);
514         tgt->ltd_idx = index;
515         tgt->ltd_uuid = *uuidp;
516         tgt->ltd_active = 0;
517         lmv->tgts[index] = tgt;
518         if (index >= lmv->desc.ld_tgt_count)
519                 lmv->desc.ld_tgt_count = index + 1;
520
521         if (lmv->connected) {
522                 rc = lmv_connect_mdc(obd, tgt);
523                 if (rc) {
524                         spin_lock(&lmv->lmv_lock);
525                         lmv->desc.ld_tgt_count--;
526                         memset(tgt, 0, sizeof(*tgt));
527                         spin_unlock(&lmv->lmv_lock);
528                 } else {
529                         int easize = sizeof(struct lmv_stripe_md) +
530                                      lmv->desc.ld_tgt_count *
531                                      sizeof(struct lu_fid);
532                         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
533                 }
534         }
535
536         lmv_init_unlock(lmv);
537         return rc;
538 }
539
540 int lmv_check_connect(struct obd_device *obd)
541 {
542         struct lmv_obd       *lmv = &obd->u.lmv;
543         struct lmv_tgt_desc  *tgt;
544         int                i;
545         int                rc;
546         int                easize;
547
548         if (lmv->connected)
549                 return 0;
550
551         lmv_init_lock(lmv);
552         if (lmv->connected) {
553                 lmv_init_unlock(lmv);
554                 return 0;
555         }
556
557         if (lmv->desc.ld_tgt_count == 0) {
558                 lmv_init_unlock(lmv);
559                 CERROR("%s: no targets configured.\n", obd->obd_name);
560                 return -EINVAL;
561         }
562
563         CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
564                lmv->cluuid.uuid, obd->obd_name);
565
566         LASSERT(lmv->tgts != NULL);
567
568         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
569                 tgt = lmv->tgts[i];
570                 if (tgt == NULL)
571                         continue;
572                 rc = lmv_connect_mdc(obd, tgt);
573                 if (rc)
574                         GOTO(out_disc, rc);
575         }
576
577         lmv_set_timeouts(obd);
578         class_export_put(lmv->exp);
579         lmv->connected = 1;
580         easize = lmv_get_easize(lmv);
581         lmv_init_ea_size(obd->obd_self_export, easize, 0, 0);
582         lmv_init_unlock(lmv);
583         return 0;
584
585  out_disc:
586         while (i-- > 0) {
587                 int rc2;
588                 tgt = lmv->tgts[i];
589                 if (tgt == NULL)
590                         continue;
591                 tgt->ltd_active = 0;
592                 if (tgt->ltd_exp) {
593                         --lmv->desc.ld_active_tgt_count;
594                         rc2 = obd_disconnect(tgt->ltd_exp);
595                         if (rc2) {
596                                 CERROR("LMV target %s disconnect on "
597                                        "MDC idx %d: error %d\n",
598                                        tgt->ltd_uuid.uuid, i, rc2);
599                         }
600                 }
601         }
602         class_disconnect(lmv->exp);
603         lmv_init_unlock(lmv);
604         return rc;
605 }
606
607 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
608 {
609         struct proc_dir_entry  *lmv_proc_dir;
610         struct lmv_obd   *lmv = &obd->u.lmv;
611         struct obd_device      *mdc_obd;
612         int                  rc;
613
614         LASSERT(tgt != NULL);
615         LASSERT(obd != NULL);
616
617         mdc_obd = class_exp2obd(tgt->ltd_exp);
618
619         if (mdc_obd) {
620                 mdc_obd->obd_force = obd->obd_force;
621                 mdc_obd->obd_fail = obd->obd_fail;
622                 mdc_obd->obd_no_recov = obd->obd_no_recov;
623         }
624
625         lmv_proc_dir = obd->obd_proc_private;
626         if (lmv_proc_dir)
627                 lprocfs_remove_proc_entry(mdc_obd->obd_name, lmv_proc_dir);
628
629         rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
630         if (rc)
631                 CERROR("Can't finanize fids factory\n");
632
633         CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
634                tgt->ltd_exp->exp_obd->obd_name,
635                tgt->ltd_exp->exp_obd->obd_uuid.uuid);
636
637         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
638         rc = obd_disconnect(tgt->ltd_exp);
639         if (rc) {
640                 if (tgt->ltd_active) {
641                         CERROR("Target %s disconnect error %d\n",
642                                tgt->ltd_uuid.uuid, rc);
643                 }
644         }
645
646         lmv_activate_target(lmv, tgt, 0);
647         tgt->ltd_exp = NULL;
648         return 0;
649 }
650
651 static int lmv_disconnect(struct obd_export *exp)
652 {
653         struct obd_device     *obd = class_exp2obd(exp);
654         struct lmv_obd  *lmv = &obd->u.lmv;
655         int                 rc;
656         int                 i;
657
658         if (!lmv->tgts)
659                 goto out_local;
660
661         /*
662          * Only disconnect the underlying layers on the final disconnect.
663          */
664         lmv->refcount--;
665         if (lmv->refcount != 0)
666                 goto out_local;
667
668         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
669                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
670                         continue;
671
672                 lmv_disconnect_mdc(obd, lmv->tgts[i]);
673         }
674
675         if (obd->obd_proc_private)
676                 lprocfs_remove((struct proc_dir_entry **)&obd->obd_proc_private);
677         else
678                 CERROR("/proc/fs/lustre/%s/%s/target_obds missing\n",
679                        obd->obd_type->typ_name, obd->obd_name);
680
681 out_local:
682         /*
683          * This is the case when no real connection is established by
684          * lmv_check_connect().
685          */
686         if (!lmv->connected)
687                 class_export_put(exp);
688         rc = class_disconnect(exp);
689         if (lmv->refcount == 0)
690                 lmv->connected = 0;
691         return rc;
692 }
693
694 static int lmv_fid2path(struct obd_export *exp, int len, void *karg, void *uarg)
695 {
696         struct obd_device       *obddev = class_exp2obd(exp);
697         struct lmv_obd          *lmv = &obddev->u.lmv;
698         struct getinfo_fid2path *gf;
699         struct lmv_tgt_desc     *tgt;
700         struct getinfo_fid2path *remote_gf = NULL;
701         int                     remote_gf_size = 0;
702         int                     rc;
703
704         gf = (struct getinfo_fid2path *)karg;
705         tgt = lmv_find_target(lmv, &gf->gf_fid);
706         if (IS_ERR(tgt))
707                 return PTR_ERR(tgt);
708
709 repeat_fid2path:
710         rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
711         if (rc != 0 && rc != -EREMOTE)
712                 GOTO(out_fid2path, rc);
713
714         /* If remote_gf != NULL, it means just building the
715          * path on the remote MDT, copy this path segement to gf */
716         if (remote_gf != NULL) {
717                 struct getinfo_fid2path *ori_gf;
718                 char *ptr;
719
720                 ori_gf = (struct getinfo_fid2path *)karg;
721                 if (strlen(ori_gf->gf_path) +
722                     strlen(gf->gf_path) > ori_gf->gf_pathlen)
723                         GOTO(out_fid2path, rc = -EOVERFLOW);
724
725                 ptr = ori_gf->gf_path;
726
727                 memmove(ptr + strlen(gf->gf_path) + 1, ptr,
728                         strlen(ori_gf->gf_path));
729
730                 strncpy(ptr, gf->gf_path, strlen(gf->gf_path));
731                 ptr += strlen(gf->gf_path);
732                 *ptr = '/';
733         }
734
735         CDEBUG(D_INFO, "%s: get path %s "DFID" rec: "LPU64" ln: %u\n",
736                tgt->ltd_exp->exp_obd->obd_name,
737                gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno,
738                gf->gf_linkno);
739
740         if (rc == 0)
741                 GOTO(out_fid2path, rc);
742
743         /* sigh, has to go to another MDT to do path building further */
744         if (remote_gf == NULL) {
745                 remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
746                 OBD_ALLOC(remote_gf, remote_gf_size);
747                 if (remote_gf == NULL)
748                         GOTO(out_fid2path, rc = -ENOMEM);
749                 remote_gf->gf_pathlen = PATH_MAX;
750         }
751
752         if (!fid_is_sane(&gf->gf_fid)) {
753                 CERROR("%s: invalid FID "DFID": rc = %d\n",
754                        tgt->ltd_exp->exp_obd->obd_name,
755                        PFID(&gf->gf_fid), -EINVAL);
756                 GOTO(out_fid2path, rc = -EINVAL);
757         }
758
759         tgt = lmv_find_target(lmv, &gf->gf_fid);
760         if (IS_ERR(tgt))
761                 GOTO(out_fid2path, rc = -EINVAL);
762
763         remote_gf->gf_fid = gf->gf_fid;
764         remote_gf->gf_recno = -1;
765         remote_gf->gf_linkno = -1;
766         memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen);
767         gf = remote_gf;
768         goto repeat_fid2path;
769
770 out_fid2path:
771         if (remote_gf != NULL)
772                 OBD_FREE(remote_gf, remote_gf_size);
773         return rc;
774 }
775
776 static int lmv_hsm_req_count(struct lmv_obd *lmv,
777                              const struct hsm_user_request *hur,
778                              const struct lmv_tgt_desc *tgt_mds)
779 {
780         int                     i, nr = 0;
781         struct lmv_tgt_desc    *curr_tgt;
782
783         /* count how many requests must be sent to the given target */
784         for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
785                 curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid);
786                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
787                         nr++;
788         }
789         return nr;
790 }
791
792 static void lmv_hsm_req_build(struct lmv_obd *lmv,
793                               struct hsm_user_request *hur_in,
794                               const struct lmv_tgt_desc *tgt_mds,
795                               struct hsm_user_request *hur_out)
796 {
797         int                     i, nr_out;
798         struct lmv_tgt_desc    *curr_tgt;
799
800         /* build the hsm_user_request for the given target */
801         hur_out->hur_request = hur_in->hur_request;
802         nr_out = 0;
803         for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
804                 curr_tgt = lmv_find_target(lmv,
805                                         &hur_in->hur_user_item[i].hui_fid);
806                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
807                         hur_out->hur_user_item[nr_out] =
808                                 hur_in->hur_user_item[i];
809                         nr_out++;
810                 }
811         }
812         hur_out->hur_request.hr_itemcount = nr_out;
813         memcpy(hur_data(hur_out), hur_data(hur_in),
814                hur_in->hur_request.hr_data_len);
815 }
816
817 static int lmv_hsm_ct_unregister(struct lmv_obd *lmv, unsigned int cmd, int len,
818                                  struct lustre_kernelcomm *lk, void *uarg)
819 {
820         int     i, rc = 0;
821
822         /* unregister request (call from llapi_hsm_copytool_fini) */
823         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
824                 /* best effort: try to clean as much as possible
825                  * (continue on error) */
826                 obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len, lk, uarg);
827         }
828
829         /* Whatever the result, remove copytool from kuc groups.
830          * Unreached coordinators will get EPIPE on next requests
831          * and will unregister automatically.
832          */
833         rc = libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group);
834         return rc;
835 }
836
837 static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len,
838                                struct lustre_kernelcomm *lk, void *uarg)
839 {
840         struct file     *filp;
841         int              i, j, err;
842         int              rc = 0;
843         bool             any_set = false;
844
845         /* All or nothing: try to register to all MDS.
846          * In case of failure, unregister from previous MDS,
847          * except if it because of inactive target. */
848         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
849                 err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp,
850                                    len, lk, uarg);
851                 if (err) {
852                         if (lmv->tgts[i]->ltd_active) {
853                                 /* permanent error */
854                                 CERROR("error: iocontrol MDC %s on MDT"
855                                        "idx %d cmd %x: err = %d\n",
856                                         lmv->tgts[i]->ltd_uuid.uuid,
857                                         i, cmd, err);
858                                 rc = err;
859                                 lk->lk_flags |= LK_FLG_STOP;
860                                 /* unregister from previous MDS */
861                                 for (j = 0; j < i; j++)
862                                         obd_iocontrol(cmd,
863                                                   lmv->tgts[j]->ltd_exp,
864                                                   len, lk, uarg);
865                                 return rc;
866                         }
867                         /* else: transient error.
868                          * kuc will register to the missing MDT
869                          * when it is back */
870                 } else {
871                         any_set = true;
872                 }
873         }
874
875         if (!any_set)
876                 /* no registration done: return error */
877                 return -ENOTCONN;
878
879         /* at least one registration done, with no failure */
880         filp = fget(lk->lk_wfd);
881         if (filp == NULL) {
882                 return -EBADF;
883         }
884         rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group, lk->lk_data);
885         if (rc != 0 && filp != NULL)
886                 fput(filp);
887         return rc;
888 }
889
890
891
892
893 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
894                          int len, void *karg, void *uarg)
895 {
896         struct obd_device    *obddev = class_exp2obd(exp);
897         struct lmv_obd       *lmv = &obddev->u.lmv;
898         int                i = 0;
899         int                rc = 0;
900         int                set = 0;
901         int                count = lmv->desc.ld_tgt_count;
902
903         if (count == 0)
904                 return -ENOTTY;
905
906         switch (cmd) {
907         case IOC_OBD_STATFS: {
908                 struct obd_ioctl_data *data = karg;
909                 struct obd_device *mdc_obd;
910                 struct obd_statfs stat_buf = {0};
911                 __u32 index;
912
913                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
914                 if ((index >= count))
915                         return -ENODEV;
916
917                 if (lmv->tgts[index] == NULL ||
918                     lmv->tgts[index]->ltd_active == 0)
919                         return -ENODATA;
920
921                 mdc_obd = class_exp2obd(lmv->tgts[index]->ltd_exp);
922                 if (!mdc_obd)
923                         return -EINVAL;
924
925                 /* copy UUID */
926                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
927                                      min((int) data->ioc_plen2,
928                                          (int) sizeof(struct obd_uuid))))
929                         return -EFAULT;
930
931                 rc = obd_statfs(NULL, lmv->tgts[index]->ltd_exp, &stat_buf,
932                                 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
933                                 0);
934                 if (rc)
935                         return rc;
936                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
937                                      min((int) data->ioc_plen1,
938                                          (int) sizeof(stat_buf))))
939                         return -EFAULT;
940                 break;
941         }
942         case OBD_IOC_QUOTACTL: {
943                 struct if_quotactl *qctl = karg;
944                 struct lmv_tgt_desc *tgt = NULL;
945                 struct obd_quotactl *oqctl;
946
947                 if (qctl->qc_valid == QC_MDTIDX) {
948                         if (qctl->qc_idx < 0 || count <= qctl->qc_idx)
949                                 return -EINVAL;
950
951                         tgt = lmv->tgts[qctl->qc_idx];
952                         if (tgt == NULL || tgt->ltd_exp == NULL)
953                                 return -EINVAL;
954                 } else if (qctl->qc_valid == QC_UUID) {
955                         for (i = 0; i < count; i++) {
956                                 tgt = lmv->tgts[i];
957                                 if (tgt == NULL)
958                                         continue;
959                                 if (!obd_uuid_equals(&tgt->ltd_uuid,
960                                                      &qctl->obd_uuid))
961                                         continue;
962
963                                 if (tgt->ltd_exp == NULL)
964                                         return -EINVAL;
965
966                                 break;
967                         }
968                 } else {
969                         return -EINVAL;
970                 }
971
972                 if (i >= count)
973                         return -EAGAIN;
974
975                 LASSERT(tgt && tgt->ltd_exp);
976                 OBD_ALLOC_PTR(oqctl);
977                 if (!oqctl)
978                         return -ENOMEM;
979
980                 QCTL_COPY(oqctl, qctl);
981                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
982                 if (rc == 0) {
983                         QCTL_COPY(qctl, oqctl);
984                         qctl->qc_valid = QC_MDTIDX;
985                         qctl->obd_uuid = tgt->ltd_uuid;
986                 }
987                 OBD_FREE_PTR(oqctl);
988                 break;
989         }
990         case OBD_IOC_CHANGELOG_SEND:
991         case OBD_IOC_CHANGELOG_CLEAR: {
992                 struct ioc_changelog *icc = karg;
993
994                 if (icc->icc_mdtindex >= count)
995                         return -ENODEV;
996
997                 if (lmv->tgts[icc->icc_mdtindex] == NULL ||
998                     lmv->tgts[icc->icc_mdtindex]->ltd_exp == NULL ||
999                     lmv->tgts[icc->icc_mdtindex]->ltd_active == 0)
1000                         return -ENODEV;
1001                 rc = obd_iocontrol(cmd, lmv->tgts[icc->icc_mdtindex]->ltd_exp,
1002                                    sizeof(*icc), icc, NULL);
1003                 break;
1004         }
1005         case LL_IOC_GET_CONNECT_FLAGS: {
1006                 if (lmv->tgts[0] == NULL)
1007                         return -ENODATA;
1008                 rc = obd_iocontrol(cmd, lmv->tgts[0]->ltd_exp, len, karg, uarg);
1009                 break;
1010         }
1011         case OBD_IOC_FID2PATH: {
1012                 rc = lmv_fid2path(exp, len, karg, uarg);
1013                 break;
1014         }
1015         case LL_IOC_HSM_STATE_GET:
1016         case LL_IOC_HSM_STATE_SET:
1017         case LL_IOC_HSM_ACTION: {
1018                 struct md_op_data       *op_data = karg;
1019                 struct lmv_tgt_desc     *tgt;
1020
1021                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
1022                 if (IS_ERR(tgt))
1023                                 return PTR_ERR(tgt);
1024
1025                 if (tgt->ltd_exp == NULL)
1026                                 return -EINVAL;
1027
1028                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1029                 break;
1030         }
1031         case LL_IOC_HSM_PROGRESS: {
1032                 const struct hsm_progress_kernel *hpk = karg;
1033                 struct lmv_tgt_desc     *tgt;
1034
1035                 tgt = lmv_find_target(lmv, &hpk->hpk_fid);
1036                 if (IS_ERR(tgt))
1037                         return PTR_ERR(tgt);
1038                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1039                 break;
1040         }
1041         case LL_IOC_HSM_REQUEST: {
1042                 struct hsm_user_request *hur = karg;
1043                 struct lmv_tgt_desc     *tgt;
1044                 unsigned int reqcount = hur->hur_request.hr_itemcount;
1045
1046                 if (reqcount == 0)
1047                         return 0;
1048
1049                 /* if the request is about a single fid
1050                  * or if there is a single MDS, no need to split
1051                  * the request. */
1052                 if (reqcount == 1 || count == 1) {
1053                         tgt = lmv_find_target(lmv,
1054                                               &hur->hur_user_item[0].hui_fid);
1055                         if (IS_ERR(tgt))
1056                                 return PTR_ERR(tgt);
1057                         rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1058                 } else {
1059                         /* split fid list to their respective MDS */
1060                         for (i = 0; i < count; i++) {
1061                                 unsigned int            nr, reqlen;
1062                                 int                     rc1;
1063                                 struct hsm_user_request *req;
1064
1065                                 nr = lmv_hsm_req_count(lmv, hur, lmv->tgts[i]);
1066                                 if (nr == 0) /* nothing for this MDS */
1067                                         continue;
1068
1069                                 /* build a request with fids for this MDS */
1070                                 reqlen = offsetof(typeof(*hur),
1071                                                   hur_user_item[nr])
1072                                          + hur->hur_request.hr_data_len;
1073                                 OBD_ALLOC_LARGE(req, reqlen);
1074                                 if (req == NULL)
1075                                         return -ENOMEM;
1076
1077                                 lmv_hsm_req_build(lmv, hur, lmv->tgts[i], req);
1078
1079                                 rc1 = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp,
1080                                                     reqlen, req, uarg);
1081                                 if (rc1 != 0 && rc == 0)
1082                                         rc = rc1;
1083                                 OBD_FREE_LARGE(req, reqlen);
1084                         }
1085                 }
1086                 break;
1087         }
1088         case LL_IOC_LOV_SWAP_LAYOUTS: {
1089                 struct md_op_data       *op_data = karg;
1090                 struct lmv_tgt_desc     *tgt1, *tgt2;
1091
1092                 tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
1093                 if (IS_ERR(tgt1))
1094                         return PTR_ERR(tgt1);
1095
1096                 tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
1097                 if (IS_ERR(tgt2))
1098                         return PTR_ERR(tgt2);
1099
1100                 if ((tgt1->ltd_exp == NULL) || (tgt2->ltd_exp == NULL))
1101                         return -EINVAL;
1102
1103                 /* only files on same MDT can have their layouts swapped */
1104                 if (tgt1->ltd_idx != tgt2->ltd_idx)
1105                         return -EPERM;
1106
1107                 rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
1108                 break;
1109         }
1110         case LL_IOC_HSM_CT_START: {
1111                 struct lustre_kernelcomm *lk = karg;
1112                 if (lk->lk_flags & LK_FLG_STOP)
1113                         rc = lmv_hsm_ct_unregister(lmv, cmd, len, lk, uarg);
1114                 else
1115                         rc = lmv_hsm_ct_register(lmv, cmd, len, lk, uarg);
1116                 break;
1117         }
1118         default:
1119                 for (i = 0; i < count; i++) {
1120                         struct obd_device *mdc_obd;
1121                         int err;
1122
1123                         if (lmv->tgts[i] == NULL ||
1124                             lmv->tgts[i]->ltd_exp == NULL)
1125                                 continue;
1126                         /* ll_umount_begin() sets force flag but for lmv, not
1127                          * mdc. Let's pass it through */
1128                         mdc_obd = class_exp2obd(lmv->tgts[i]->ltd_exp);
1129                         mdc_obd->obd_force = obddev->obd_force;
1130                         err = obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len,
1131                                             karg, uarg);
1132                         if (err == -ENODATA && cmd == OBD_IOC_POLL_QUOTACHECK) {
1133                                 return err;
1134                         } else if (err) {
1135                                 if (lmv->tgts[i]->ltd_active) {
1136                                         CERROR("error: iocontrol MDC %s on MDT"
1137                                                "idx %d cmd %x: err = %d\n",
1138                                                 lmv->tgts[i]->ltd_uuid.uuid,
1139                                                 i, cmd, err);
1140                                         if (!rc)
1141                                                 rc = err;
1142                                 }
1143                         } else
1144                                 set = 1;
1145                 }
1146                 if (!set && !rc)
1147                         rc = -EIO;
1148         }
1149         return rc;
1150 }
1151
1152 #if 0
1153 static int lmv_all_chars_policy(int count, const char *name,
1154                                 int len)
1155 {
1156         unsigned int c = 0;
1157
1158         while (len > 0)
1159                 c += name[--len];
1160         c = c % count;
1161         return c;
1162 }
1163
1164 static int lmv_nid_policy(struct lmv_obd *lmv)
1165 {
1166         struct obd_import *imp;
1167         __u32         id;
1168
1169         /*
1170          * XXX: To get nid we assume that underlying obd device is mdc.
1171          */
1172         imp = class_exp2cliimp(lmv->tgts[0].ltd_exp);
1173         id = imp->imp_connection->c_self ^ (imp->imp_connection->c_self >> 32);
1174         return id % lmv->desc.ld_tgt_count;
1175 }
1176
1177 static int lmv_choose_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1178                           placement_policy_t placement)
1179 {
1180         switch (placement) {
1181         case PLACEMENT_CHAR_POLICY:
1182                 return lmv_all_chars_policy(lmv->desc.ld_tgt_count,
1183                                             op_data->op_name,
1184                                             op_data->op_namelen);
1185         case PLACEMENT_NID_POLICY:
1186                 return lmv_nid_policy(lmv);
1187
1188         default:
1189                 break;
1190         }
1191
1192         CERROR("Unsupported placement policy %x\n", placement);
1193         return -EINVAL;
1194 }
1195 #endif
1196
1197 /**
1198  * This is _inode_ placement policy function (not name).
1199  */
1200 static int lmv_placement_policy(struct obd_device *obd,
1201                                 struct md_op_data *op_data,
1202                                 mdsno_t *mds)
1203 {
1204         struct lmv_obd    *lmv = &obd->u.lmv;
1205
1206         LASSERT(mds != NULL);
1207
1208         if (lmv->desc.ld_tgt_count == 1) {
1209                 *mds = 0;
1210                 return 0;
1211         }
1212
1213         /**
1214          * If stripe_offset is provided during setdirstripe
1215          * (setdirstripe -i xx), xx MDS will be choosen.
1216          */
1217         if (op_data->op_cli_flags & CLI_SET_MEA) {
1218                 struct lmv_user_md *lum;
1219
1220                 lum = (struct lmv_user_md *)op_data->op_data;
1221                 if (lum->lum_type == LMV_STRIPE_TYPE &&
1222                     lum->lum_stripe_offset != -1) {
1223                         if (lum->lum_stripe_offset >= lmv->desc.ld_tgt_count) {
1224                                 CERROR("%s: Stripe_offset %d > MDT count %d:"
1225                                        " rc = %d\n", obd->obd_name,
1226                                        lum->lum_stripe_offset,
1227                                        lmv->desc.ld_tgt_count, -ERANGE);
1228                                 return -ERANGE;
1229                         }
1230                         *mds = lum->lum_stripe_offset;
1231                         return 0;
1232                 }
1233         }
1234
1235         /* Allocate new fid on target according to operation type and parent
1236          * home mds. */
1237         *mds = op_data->op_mds;
1238         return 0;
1239 }
1240
1241 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
1242                     mdsno_t mds)
1243 {
1244         struct lmv_tgt_desc     *tgt;
1245         int                      rc;
1246
1247         tgt = lmv_get_target(lmv, mds);
1248         if (IS_ERR(tgt))
1249                 return PTR_ERR(tgt);
1250
1251         /*
1252          * New seq alloc and FLD setup should be atomic. Otherwise we may find
1253          * on server that seq in new allocated fid is not yet known.
1254          */
1255         mutex_lock(&tgt->ltd_fid_mutex);
1256
1257         if (tgt->ltd_active == 0 || tgt->ltd_exp == NULL)
1258                 GOTO(out, rc = -ENODEV);
1259
1260         /*
1261          * Asking underlaying tgt layer to allocate new fid.
1262          */
1263         rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
1264         if (rc > 0) {
1265                 LASSERT(fid_is_sane(fid));
1266                 rc = 0;
1267         }
1268
1269 out:
1270         mutex_unlock(&tgt->ltd_fid_mutex);
1271         return rc;
1272 }
1273
1274 int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
1275                   struct md_op_data *op_data)
1276 {
1277         struct obd_device     *obd = class_exp2obd(exp);
1278         struct lmv_obd  *lmv = &obd->u.lmv;
1279         mdsno_t         mds = 0;
1280         int                 rc;
1281
1282         LASSERT(op_data != NULL);
1283         LASSERT(fid != NULL);
1284
1285         rc = lmv_placement_policy(obd, op_data, &mds);
1286         if (rc) {
1287                 CERROR("Can't get target for allocating fid, "
1288                        "rc %d\n", rc);
1289                 return rc;
1290         }
1291
1292         rc = __lmv_fid_alloc(lmv, fid, mds);
1293         if (rc) {
1294                 CERROR("Can't alloc new fid, rc %d\n", rc);
1295                 return rc;
1296         }
1297
1298         return rc;
1299 }
1300
1301 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1302 {
1303         struct lmv_obd       *lmv = &obd->u.lmv;
1304         struct lprocfs_static_vars  lvars;
1305         struct lmv_desc     *desc;
1306         int                      rc;
1307
1308         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1309                 CERROR("LMV setup requires a descriptor\n");
1310                 return -EINVAL;
1311         }
1312
1313         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1314         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1315                 CERROR("Lmv descriptor size wrong: %d > %d\n",
1316                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1317                 return -EINVAL;
1318         }
1319
1320         OBD_ALLOC(lmv->tgts, sizeof(*lmv->tgts) * 32);
1321         if (lmv->tgts == NULL)
1322                 return -ENOMEM;
1323         lmv->tgts_size = 32;
1324
1325         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
1326         lmv->desc.ld_tgt_count = 0;
1327         lmv->desc.ld_active_tgt_count = 0;
1328         lmv->max_cookiesize = 0;
1329         lmv->max_def_easize = 0;
1330         lmv->max_easize = 0;
1331         lmv->lmv_placement = PLACEMENT_CHAR_POLICY;
1332
1333         spin_lock_init(&lmv->lmv_lock);
1334         mutex_init(&lmv->init_mutex);
1335
1336         lprocfs_lmv_init_vars(&lvars);
1337
1338         lprocfs_obd_setup(obd, lvars.obd_vars);
1339 #ifdef LPROCFS
1340         {
1341                 rc = lprocfs_seq_create(obd->obd_proc_entry, "target_obd",
1342                                         0444, &lmv_proc_target_fops, obd);
1343                 if (rc)
1344                         CWARN("%s: error adding LMV target_obd file: rc = %d\n",
1345                                obd->obd_name, rc);
1346        }
1347 #endif
1348         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1349                              LUSTRE_CLI_FLD_HASH_DHT);
1350         if (rc) {
1351                 CERROR("Can't init FLD, err %d\n", rc);
1352                 GOTO(out, rc);
1353         }
1354
1355         return 0;
1356
1357 out:
1358         return rc;
1359 }
1360
1361 static int lmv_cleanup(struct obd_device *obd)
1362 {
1363         struct lmv_obd   *lmv = &obd->u.lmv;
1364
1365         fld_client_fini(&lmv->lmv_fld);
1366         if (lmv->tgts != NULL) {
1367                 int i;
1368                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1369                         if (lmv->tgts[i] == NULL)
1370                                 continue;
1371                         lmv_del_target(lmv, i);
1372                 }
1373                 OBD_FREE(lmv->tgts, sizeof(*lmv->tgts) * lmv->tgts_size);
1374                 lmv->tgts_size = 0;
1375         }
1376         return 0;
1377 }
1378
1379 static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
1380 {
1381         struct lustre_cfg       *lcfg = buf;
1382         struct obd_uuid         obd_uuid;
1383         int                     gen;
1384         __u32                   index;
1385         int                     rc;
1386
1387         switch (lcfg->lcfg_command) {
1388         case LCFG_ADD_MDC:
1389                 /* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
1390                  * 2:0  3:1  4:lustre-MDT0000-mdc_UUID */
1391                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid))
1392                         GOTO(out, rc = -EINVAL);
1393
1394                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
1395
1396                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
1397                         GOTO(out, rc = -EINVAL);
1398                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
1399                         GOTO(out, rc = -EINVAL);
1400                 rc = lmv_add_target(obd, &obd_uuid, index, gen);
1401                 GOTO(out, rc);
1402         default:
1403                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1404                 GOTO(out, rc = -EINVAL);
1405         }
1406 out:
1407         return rc;
1408 }
1409
1410 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1411                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1412 {
1413         struct obd_device     *obd = class_exp2obd(exp);
1414         struct lmv_obd  *lmv = &obd->u.lmv;
1415         struct obd_statfs     *temp;
1416         int                 rc = 0;
1417         int                 i;
1418
1419         rc = lmv_check_connect(obd);
1420         if (rc)
1421                 return rc;
1422
1423         OBD_ALLOC(temp, sizeof(*temp));
1424         if (temp == NULL)
1425                 return -ENOMEM;
1426
1427         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1428                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1429                         continue;
1430
1431                 rc = obd_statfs(env, lmv->tgts[i]->ltd_exp, temp,
1432                                 max_age, flags);
1433                 if (rc) {
1434                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
1435                                lmv->tgts[i]->ltd_exp->exp_obd->obd_name,
1436                                rc);
1437                         GOTO(out_free_temp, rc);
1438                 }
1439
1440                 if (i == 0) {
1441                         *osfs = *temp;
1442                         /* If the statfs is from mount, it will needs
1443                          * retrieve necessary information from MDT0.
1444                          * i.e. mount does not need the merged osfs
1445                          * from all of MDT.
1446                          * And also clients can be mounted as long as
1447                          * MDT0 is in service*/
1448                         if (flags & OBD_STATFS_FOR_MDT0)
1449                                 GOTO(out_free_temp, rc);
1450                 } else {
1451                         osfs->os_bavail += temp->os_bavail;
1452                         osfs->os_blocks += temp->os_blocks;
1453                         osfs->os_ffree += temp->os_ffree;
1454                         osfs->os_files += temp->os_files;
1455                 }
1456         }
1457
1458 out_free_temp:
1459         OBD_FREE(temp, sizeof(*temp));
1460         return rc;
1461 }
1462
1463 static int lmv_getstatus(struct obd_export *exp,
1464                          struct lu_fid *fid,
1465                          struct obd_capa **pc)
1466 {
1467         struct obd_device    *obd = exp->exp_obd;
1468         struct lmv_obd       *lmv = &obd->u.lmv;
1469         int                rc;
1470
1471         rc = lmv_check_connect(obd);
1472         if (rc)
1473                 return rc;
1474
1475         rc = md_getstatus(lmv->tgts[0]->ltd_exp, fid, pc);
1476         return rc;
1477 }
1478
1479 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1480                         struct obd_capa *oc, obd_valid valid, const char *name,
1481                         const char *input, int input_size, int output_size,
1482                         int flags, struct ptlrpc_request **request)
1483 {
1484         struct obd_device      *obd = exp->exp_obd;
1485         struct lmv_obd   *lmv = &obd->u.lmv;
1486         struct lmv_tgt_desc    *tgt;
1487         int                  rc;
1488
1489         rc = lmv_check_connect(obd);
1490         if (rc)
1491                 return rc;
1492
1493         tgt = lmv_find_target(lmv, fid);
1494         if (IS_ERR(tgt))
1495                 return PTR_ERR(tgt);
1496
1497         rc = md_getxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1498                          input_size, output_size, flags, request);
1499
1500         return rc;
1501 }
1502
1503 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1504                         struct obd_capa *oc, obd_valid valid, const char *name,
1505                         const char *input, int input_size, int output_size,
1506                         int flags, __u32 suppgid,
1507                         struct ptlrpc_request **request)
1508 {
1509         struct obd_device      *obd = exp->exp_obd;
1510         struct lmv_obd   *lmv = &obd->u.lmv;
1511         struct lmv_tgt_desc    *tgt;
1512         int                  rc;
1513
1514         rc = lmv_check_connect(obd);
1515         if (rc)
1516                 return rc;
1517
1518         tgt = lmv_find_target(lmv, fid);
1519         if (IS_ERR(tgt))
1520                 return PTR_ERR(tgt);
1521
1522         rc = md_setxattr(tgt->ltd_exp, fid, oc, valid, name, input,
1523                          input_size, output_size, flags, suppgid,
1524                          request);
1525
1526         return rc;
1527 }
1528
1529 static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1530                        struct ptlrpc_request **request)
1531 {
1532         struct obd_device       *obd = exp->exp_obd;
1533         struct lmv_obd    *lmv = &obd->u.lmv;
1534         struct lmv_tgt_desc     *tgt;
1535         int                   rc;
1536
1537         rc = lmv_check_connect(obd);
1538         if (rc)
1539                 return rc;
1540
1541         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1542         if (IS_ERR(tgt))
1543                 return PTR_ERR(tgt);
1544
1545         if (op_data->op_flags & MF_GET_MDT_IDX) {
1546                 op_data->op_mds = tgt->ltd_idx;
1547                 return 0;
1548         }
1549
1550         rc = md_getattr(tgt->ltd_exp, op_data, request);
1551
1552         return rc;
1553 }
1554
1555 static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
1556 {
1557         struct obd_device   *obd = exp->exp_obd;
1558         struct lmv_obd      *lmv = &obd->u.lmv;
1559         int               i;
1560         int               rc;
1561
1562         rc = lmv_check_connect(obd);
1563         if (rc)
1564                 return rc;
1565
1566         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1567
1568         /*
1569          * With DNE every object can have two locks in different namespaces:
1570          * lookup lock in space of MDT storing direntry and update/open lock in
1571          * space of MDT storing inode.
1572          */
1573         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1574                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1575                         continue;
1576                 md_null_inode(lmv->tgts[i]->ltd_exp, fid);
1577         }
1578
1579         return 0;
1580 }
1581
1582 static int lmv_find_cbdata(struct obd_export *exp, const struct lu_fid *fid,
1583                            ldlm_iterator_t it, void *data)
1584 {
1585         struct obd_device   *obd = exp->exp_obd;
1586         struct lmv_obd      *lmv = &obd->u.lmv;
1587         int               i;
1588         int               rc;
1589
1590         rc = lmv_check_connect(obd);
1591         if (rc)
1592                 return rc;
1593
1594         CDEBUG(D_INODE, "CBDATA for "DFID"\n", PFID(fid));
1595
1596         /*
1597          * With DNE every object can have two locks in different namespaces:
1598          * lookup lock in space of MDT storing direntry and update/open lock in
1599          * space of MDT storing inode.
1600          */
1601         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1602                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL)
1603                         continue;
1604                 rc = md_find_cbdata(lmv->tgts[i]->ltd_exp, fid, it, data);
1605                 if (rc)
1606                         return rc;
1607         }
1608
1609         return rc;
1610 }
1611
1612
1613 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1614                      struct md_open_data *mod, struct ptlrpc_request **request)
1615 {
1616         struct obd_device     *obd = exp->exp_obd;
1617         struct lmv_obd  *lmv = &obd->u.lmv;
1618         struct lmv_tgt_desc   *tgt;
1619         int                 rc;
1620
1621         rc = lmv_check_connect(obd);
1622         if (rc)
1623                 return rc;
1624
1625         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1626         if (IS_ERR(tgt))
1627                 return PTR_ERR(tgt);
1628
1629         CDEBUG(D_INODE, "CLOSE "DFID"\n", PFID(&op_data->op_fid1));
1630         rc = md_close(tgt->ltd_exp, op_data, mod, request);
1631         return rc;
1632 }
1633
1634 struct lmv_tgt_desc
1635 *lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1636                 struct lu_fid *fid)
1637 {
1638         struct lmv_tgt_desc *tgt;
1639
1640         tgt = lmv_find_target(lmv, fid);
1641         if (IS_ERR(tgt))
1642                 return tgt;
1643
1644         op_data->op_mds = tgt->ltd_idx;
1645
1646         return tgt;
1647 }
1648
1649 int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1650                const void *data, int datalen, int mode, __u32 uid,
1651                __u32 gid, cfs_cap_t cap_effective, __u64 rdev,
1652                struct ptlrpc_request **request)
1653 {
1654         struct obd_device       *obd = exp->exp_obd;
1655         struct lmv_obd    *lmv = &obd->u.lmv;
1656         struct lmv_tgt_desc     *tgt;
1657         int                   rc;
1658
1659         rc = lmv_check_connect(obd);
1660         if (rc)
1661                 return rc;
1662
1663         if (!lmv->desc.ld_active_tgt_count)
1664                 return -EIO;
1665
1666         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1667         if (IS_ERR(tgt))
1668                 return PTR_ERR(tgt);
1669
1670         rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
1671         if (rc)
1672                 return rc;
1673
1674         CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
1675                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1676                op_data->op_mds);
1677
1678         op_data->op_flags |= MF_MDC_CANCEL_FID1;
1679         rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
1680                        cap_effective, rdev, request);
1681
1682         if (rc == 0) {
1683                 if (*request == NULL)
1684                         return rc;
1685                 CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
1686         }
1687         return rc;
1688 }
1689
1690 static int lmv_done_writing(struct obd_export *exp,
1691                             struct md_op_data *op_data,
1692                             struct md_open_data *mod)
1693 {
1694         struct obd_device     *obd = exp->exp_obd;
1695         struct lmv_obd  *lmv = &obd->u.lmv;
1696         struct lmv_tgt_desc   *tgt;
1697         int                 rc;
1698
1699         rc = lmv_check_connect(obd);
1700         if (rc)
1701                 return rc;
1702
1703         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1704         if (IS_ERR(tgt))
1705                 return PTR_ERR(tgt);
1706
1707         rc = md_done_writing(tgt->ltd_exp, op_data, mod);
1708         return rc;
1709 }
1710
1711 static int
1712 lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1713                    struct lookup_intent *it, struct md_op_data *op_data,
1714                    struct lustre_handle *lockh, void *lmm, int lmmsize,
1715                    int extra_lock_flags)
1716 {
1717         struct ptlrpc_request      *req = it->d.lustre.it_data;
1718         struct obd_device         *obd = exp->exp_obd;
1719         struct lmv_obd       *lmv = &obd->u.lmv;
1720         struct lustre_handle    plock;
1721         struct lmv_tgt_desc     *tgt;
1722         struct md_op_data         *rdata;
1723         struct lu_fid          fid1;
1724         struct mdt_body     *body;
1725         int                      rc = 0;
1726         int                      pmode;
1727
1728         body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1729         LASSERT(body != NULL);
1730
1731         if (!(body->valid & OBD_MD_MDS))
1732                 return 0;
1733
1734         CDEBUG(D_INODE, "REMOTE_ENQUEUE '%s' on "DFID" -> "DFID"\n",
1735                LL_IT2STR(it), PFID(&op_data->op_fid1), PFID(&body->fid1));
1736
1737         /*
1738          * We got LOOKUP lock, but we really need attrs.
1739          */
1740         pmode = it->d.lustre.it_lock_mode;
1741         LASSERT(pmode != 0);
1742         memcpy(&plock, lockh, sizeof(plock));
1743         it->d.lustre.it_lock_mode = 0;
1744         it->d.lustre.it_data = NULL;
1745         fid1 = body->fid1;
1746
1747         it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
1748         ptlrpc_req_finished(req);
1749
1750         tgt = lmv_find_target(lmv, &fid1);
1751         if (IS_ERR(tgt))
1752                 GOTO(out, rc = PTR_ERR(tgt));
1753
1754         OBD_ALLOC_PTR(rdata);
1755         if (rdata == NULL)
1756                 GOTO(out, rc = -ENOMEM);
1757
1758         rdata->op_fid1 = fid1;
1759         rdata->op_bias = MDS_CROSS_REF;
1760
1761         rc = md_enqueue(tgt->ltd_exp, einfo, it, rdata, lockh,
1762                         lmm, lmmsize, NULL, extra_lock_flags);
1763         OBD_FREE_PTR(rdata);
1764 out:
1765         ldlm_lock_decref(&plock, pmode);
1766         return rc;
1767 }
1768
1769 static int
1770 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1771             struct lookup_intent *it, struct md_op_data *op_data,
1772             struct lustre_handle *lockh, void *lmm, int lmmsize,
1773             struct ptlrpc_request **req, __u64 extra_lock_flags)
1774 {
1775         struct obd_device       *obd = exp->exp_obd;
1776         struct lmv_obd     *lmv = &obd->u.lmv;
1777         struct lmv_tgt_desc      *tgt;
1778         int                    rc;
1779
1780         rc = lmv_check_connect(obd);
1781         if (rc)
1782                 return rc;
1783
1784         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
1785                LL_IT2STR(it), PFID(&op_data->op_fid1));
1786
1787         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1788         if (IS_ERR(tgt))
1789                 return PTR_ERR(tgt);
1790
1791         CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
1792                LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
1793
1794         rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
1795                         lmm, lmmsize, req, extra_lock_flags);
1796
1797         if (rc == 0 && it && it->it_op == IT_OPEN) {
1798                 rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
1799                                         lmm, lmmsize, extra_lock_flags);
1800         }
1801         return rc;
1802 }
1803
1804 static int
1805 lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
1806                  struct ptlrpc_request **request)
1807 {
1808         struct ptlrpc_request   *req = NULL;
1809         struct obd_device       *obd = exp->exp_obd;
1810         struct lmv_obd    *lmv = &obd->u.lmv;
1811         struct lmv_tgt_desc     *tgt;
1812         struct mdt_body  *body;
1813         int                   rc;
1814
1815         rc = lmv_check_connect(obd);
1816         if (rc)
1817                 return rc;
1818
1819         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1820         if (IS_ERR(tgt))
1821                 return PTR_ERR(tgt);
1822
1823         CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
1824                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
1825                tgt->ltd_idx);
1826
1827         rc = md_getattr_name(tgt->ltd_exp, op_data, request);
1828         if (rc != 0)
1829                 return rc;
1830
1831         body = req_capsule_server_get(&(*request)->rq_pill,
1832                                       &RMF_MDT_BODY);
1833         LASSERT(body != NULL);
1834
1835         if (body->valid & OBD_MD_MDS) {
1836                 struct lu_fid rid = body->fid1;
1837                 CDEBUG(D_INODE, "Request attrs for "DFID"\n",
1838                        PFID(&rid));
1839
1840                 tgt = lmv_find_target(lmv, &rid);
1841                 if (IS_ERR(tgt)) {
1842                         ptlrpc_req_finished(*request);
1843                         return PTR_ERR(tgt);
1844                 }
1845
1846                 op_data->op_fid1 = rid;
1847                 op_data->op_valid |= OBD_MD_FLCROSSREF;
1848                 op_data->op_namelen = 0;
1849                 op_data->op_name = NULL;
1850                 rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
1851                 ptlrpc_req_finished(*request);
1852                 *request = req;
1853         }
1854
1855         return rc;
1856 }
1857
1858 #define md_op_data_fid(op_data, fl)                  \
1859         (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1860          fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1861          fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
1862          fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
1863          NULL)
1864
1865 static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
1866                             int op_tgt, ldlm_mode_t mode, int bits, int flag)
1867 {
1868         struct lu_fid     *fid = md_op_data_fid(op_data, flag);
1869         struct obd_device      *obd = exp->exp_obd;
1870         struct lmv_obd   *lmv = &obd->u.lmv;
1871         struct lmv_tgt_desc    *tgt;
1872         ldlm_policy_data_t      policy = {{0}};
1873         int                  rc = 0;
1874
1875         if (!fid_is_sane(fid))
1876                 return 0;
1877
1878         tgt = lmv_find_target(lmv, fid);
1879         if (IS_ERR(tgt))
1880                 return PTR_ERR(tgt);
1881
1882         if (tgt->ltd_idx != op_tgt) {
1883                 CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
1884                 policy.l_inodebits.bits = bits;
1885                 rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
1886                                       mode, LCF_ASYNC, NULL);
1887         } else {
1888                 CDEBUG(D_INODE,
1889                        "EARLY_CANCEL skip operation target %d on "DFID"\n",
1890                        op_tgt, PFID(fid));
1891                 op_data->op_flags |= flag;
1892                 rc = 0;
1893         }
1894
1895         return rc;
1896 }
1897
1898 /*
1899  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1900  * op_data->op_fid2
1901  */
1902 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1903                     struct ptlrpc_request **request)
1904 {
1905         struct obd_device       *obd = exp->exp_obd;
1906         struct lmv_obd    *lmv = &obd->u.lmv;
1907         struct lmv_tgt_desc     *tgt;
1908         int                   rc;
1909
1910         rc = lmv_check_connect(obd);
1911         if (rc)
1912                 return rc;
1913
1914         LASSERT(op_data->op_namelen != 0);
1915
1916         CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
1917                PFID(&op_data->op_fid2), op_data->op_namelen,
1918                op_data->op_name, PFID(&op_data->op_fid1));
1919
1920         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1921         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1922         op_data->op_cap = cfs_curproc_cap_pack();
1923         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1924         if (IS_ERR(tgt))
1925                 return PTR_ERR(tgt);
1926
1927         /*
1928          * Cancel UPDATE lock on child (fid1).
1929          */
1930         op_data->op_flags |= MF_MDC_CANCEL_FID2;
1931         rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
1932                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
1933         if (rc != 0)
1934                 return rc;
1935
1936         rc = md_link(tgt->ltd_exp, op_data, request);
1937
1938         return rc;
1939 }
1940
1941 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1942                       const char *old, int oldlen, const char *new, int newlen,
1943                       struct ptlrpc_request **request)
1944 {
1945         struct obd_device       *obd = exp->exp_obd;
1946         struct lmv_obd    *lmv = &obd->u.lmv;
1947         struct lmv_tgt_desc     *src_tgt;
1948         struct lmv_tgt_desc     *tgt_tgt;
1949         int                     rc;
1950
1951         LASSERT(oldlen != 0);
1952
1953         CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n",
1954                oldlen, old, PFID(&op_data->op_fid1),
1955                newlen, new, PFID(&op_data->op_fid2));
1956
1957         rc = lmv_check_connect(obd);
1958         if (rc)
1959                 return rc;
1960
1961         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1962         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1963         op_data->op_cap = cfs_curproc_cap_pack();
1964         src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1965         if (IS_ERR(src_tgt))
1966                 return PTR_ERR(src_tgt);
1967
1968         tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1969         if (IS_ERR(tgt_tgt))
1970                 return PTR_ERR(tgt_tgt);
1971         /*
1972          * LOOKUP lock on src child (fid3) should also be cancelled for
1973          * src_tgt in mdc_rename.
1974          */
1975         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
1976
1977         /*
1978          * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
1979          * own target.
1980          */
1981         rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1982                               LCK_EX, MDS_INODELOCK_UPDATE,
1983                               MF_MDC_CANCEL_FID2);
1984
1985         /*
1986          * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
1987          */
1988         if (rc == 0) {
1989                 rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1990                                       LCK_EX, MDS_INODELOCK_LOOKUP,
1991                                       MF_MDC_CANCEL_FID4);
1992         }
1993
1994         /*
1995          * Cancel all the locks on tgt child (fid4).
1996          */
1997         if (rc == 0)
1998                 rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
1999                                       LCK_EX, MDS_INODELOCK_FULL,
2000                                       MF_MDC_CANCEL_FID4);
2001
2002         if (rc == 0)
2003                 rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
2004                                new, newlen, request);
2005         return rc;
2006 }
2007
2008 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
2009                        void *ea, int ealen, void *ea2, int ea2len,
2010                        struct ptlrpc_request **request,
2011                        struct md_open_data **mod)
2012 {
2013         struct obd_device       *obd = exp->exp_obd;
2014         struct lmv_obd    *lmv = &obd->u.lmv;
2015         struct lmv_tgt_desc     *tgt;
2016         int                   rc = 0;
2017
2018         rc = lmv_check_connect(obd);
2019         if (rc)
2020                 return rc;
2021
2022         CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n",
2023                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
2024
2025         op_data->op_flags |= MF_MDC_CANCEL_FID1;
2026         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2027         if (IS_ERR(tgt))
2028                 return PTR_ERR(tgt);
2029
2030         rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
2031                         ea2len, request, mod);
2032
2033         return rc;
2034 }
2035
2036 static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
2037                     struct obd_capa *oc, struct ptlrpc_request **request)
2038 {
2039         struct obd_device        *obd = exp->exp_obd;
2040         struct lmv_obd      *lmv = &obd->u.lmv;
2041         struct lmv_tgt_desc       *tgt;
2042         int                     rc;
2043
2044         rc = lmv_check_connect(obd);
2045         if (rc)
2046                 return rc;
2047
2048         tgt = lmv_find_target(lmv, fid);
2049         if (IS_ERR(tgt))
2050                 return PTR_ERR(tgt);
2051
2052         rc = md_sync(tgt->ltd_exp, fid, oc, request);
2053         return rc;
2054 }
2055
2056 /*
2057  * Adjust a set of pages, each page containing an array of lu_dirpages,
2058  * so that each page can be used as a single logical lu_dirpage.
2059  *
2060  * A lu_dirpage is laid out as follows, where s = ldp_hash_start,
2061  * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a
2062  * struct lu_dirent.  It has size up to LU_PAGE_SIZE. The ldp_hash_end
2063  * value is used as a cookie to request the next lu_dirpage in a
2064  * directory listing that spans multiple pages (two in this example):
2065  *   ________
2066  *  |   |
2067  * .|--------v-------   -----.
2068  * |s|e|f|p|ent|ent| ... |ent|
2069  * '--|--------------   -----'   Each CFS_PAGE contains a single
2070  *    '------.             lu_dirpage.
2071  * .---------v-------   -----.
2072  * |s|e|f|p|ent| 0 | ... | 0 |
2073  * '-----------------   -----'
2074  *
2075  * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is
2076  * larger than LU_PAGE_SIZE, a single host page may contain multiple
2077  * lu_dirpages. After reading the lu_dirpages from the MDS, the
2078  * ldp_hash_end of the first lu_dirpage refers to the one immediately
2079  * after it in the same CFS_PAGE (arrows simplified for brevity, but
2080  * in general e0==s1, e1==s2, etc.):
2081  *
2082  * .--------------------   -----.
2083  * |s0|e0|f0|p|ent|ent| ... |ent|
2084  * |---v----------------   -----|
2085  * |s1|e1|f1|p|ent|ent| ... |ent|
2086  * |---v----------------   -----|  Here, each CFS_PAGE contains
2087  *           ...                 multiple lu_dirpages.
2088  * |---v----------------   -----|
2089  * |s'|e'|f'|p|ent|ent| ... |ent|
2090  * '---|----------------   -----'
2091  *     v
2092  * .----------------------------.
2093  * |    next CFS_PAGE       |
2094  *
2095  * This structure is transformed into a single logical lu_dirpage as follows:
2096  *
2097  * - Replace e0 with e' so the request for the next lu_dirpage gets the page
2098  *   labeled 'next CFS_PAGE'.
2099  *
2100  * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether
2101  *   a hash collision with the next page exists.
2102  *
2103  * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span
2104  *   to the first entry of the next lu_dirpage.
2105  */
2106 #if PAGE_CACHE_SIZE > LU_PAGE_SIZE
2107 static void lmv_adjust_dirpages(struct page **pages, int ncfspgs, int nlupgs)
2108 {
2109         int i;
2110
2111         for (i = 0; i < ncfspgs; i++) {
2112                 struct lu_dirpage       *dp = kmap(pages[i]);
2113                 struct lu_dirpage       *first = dp;
2114                 struct lu_dirent        *end_dirent = NULL;
2115                 struct lu_dirent        *ent;
2116                 __u64                   hash_end = dp->ldp_hash_end;
2117                 __u32                   flags = dp->ldp_flags;
2118
2119                 while (--nlupgs > 0) {
2120                         ent = lu_dirent_start(dp);
2121                         for (end_dirent = ent; ent != NULL;
2122                              end_dirent = ent, ent = lu_dirent_next(ent));
2123
2124                         /* Advance dp to next lu_dirpage. */
2125                         dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
2126
2127                         /* Check if we've reached the end of the CFS_PAGE. */
2128                         if (!((unsigned long)dp & ~CFS_PAGE_MASK))
2129                                 break;
2130
2131                         /* Save the hash and flags of this lu_dirpage. */
2132                         hash_end = dp->ldp_hash_end;
2133                         flags = dp->ldp_flags;
2134
2135                         /* Check if lu_dirpage contains no entries. */
2136                         if (!end_dirent)
2137                                 break;
2138
2139                         /* Enlarge the end entry lde_reclen from 0 to
2140                          * first entry of next lu_dirpage. */
2141                         LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0);
2142                         end_dirent->lde_reclen =
2143                                 cpu_to_le16((char *)(dp->ldp_entries) -
2144                                             (char *)end_dirent);
2145                 }
2146
2147                 first->ldp_hash_end = hash_end;
2148                 first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
2149                 first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
2150
2151                 kunmap(pages[i]);
2152         }
2153         LASSERTF(nlupgs == 0, "left = %d", nlupgs);
2154 }
2155 #else
2156 #define lmv_adjust_dirpages(pages, ncfspgs, nlupgs) do {} while (0)
2157 #endif  /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */
2158
2159 static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
2160                         struct page **pages, struct ptlrpc_request **request)
2161 {
2162         struct obd_device       *obd = exp->exp_obd;
2163         struct lmv_obd          *lmv = &obd->u.lmv;
2164         __u64                   offset = op_data->op_offset;
2165         int                     rc;
2166         int                     ncfspgs; /* pages read in PAGE_CACHE_SIZE */
2167         int                     nlupgs; /* pages read in LU_PAGE_SIZE */
2168         struct lmv_tgt_desc     *tgt;
2169
2170         rc = lmv_check_connect(obd);
2171         if (rc)
2172                 return rc;
2173
2174         CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n",
2175                offset, PFID(&op_data->op_fid1));
2176
2177         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2178         if (IS_ERR(tgt))
2179                 return PTR_ERR(tgt);
2180
2181         rc = md_readpage(tgt->ltd_exp, op_data, pages, request);
2182         if (rc != 0)
2183                 return rc;
2184
2185         ncfspgs = ((*request)->rq_bulk->bd_nob_transferred + PAGE_CACHE_SIZE - 1)
2186                  >> PAGE_CACHE_SHIFT;
2187         nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
2188         LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
2189         LASSERT(ncfspgs > 0 && ncfspgs <= op_data->op_npages);
2190
2191         CDEBUG(D_INODE, "read %d(%d)/%d pages\n", ncfspgs, nlupgs,
2192                op_data->op_npages);
2193
2194         lmv_adjust_dirpages(pages, ncfspgs, nlupgs);
2195
2196         return rc;
2197 }
2198
2199 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2200                       struct ptlrpc_request **request)
2201 {
2202         struct obd_device       *obd = exp->exp_obd;
2203         struct lmv_obd    *lmv = &obd->u.lmv;
2204         struct lmv_tgt_desc     *tgt = NULL;
2205         struct mdt_body         *body;
2206         int                  rc;
2207
2208         rc = lmv_check_connect(obd);
2209         if (rc)
2210                 return rc;
2211 retry:
2212         /* Send unlink requests to the MDT where the child is located */
2213         if (likely(!fid_is_zero(&op_data->op_fid2)))
2214                 tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
2215         else
2216                 tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2217         if (IS_ERR(tgt))
2218                 return PTR_ERR(tgt);
2219
2220         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2221         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2222         op_data->op_cap = cfs_curproc_cap_pack();
2223
2224         /*
2225          * If child's fid is given, cancel unused locks for it if it is from
2226          * another export than parent.
2227          *
2228          * LOOKUP lock for child (fid3) should also be cancelled on parent
2229          * tgt_tgt in mdc_unlink().
2230          */
2231         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2232
2233         /*
2234          * Cancel FULL locks on child (fid3).
2235          */
2236         rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
2237                               MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
2238
2239         if (rc != 0)
2240                 return rc;
2241
2242         CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n",
2243                PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
2244
2245         rc = md_unlink(tgt->ltd_exp, op_data, request);
2246         if (rc != 0 && rc != -EREMOTE)
2247                 return rc;
2248
2249         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2250         if (body == NULL)
2251                 return -EPROTO;
2252
2253         /* Not cross-ref case, just get out of here. */
2254         if (likely(!(body->valid & OBD_MD_MDS)))
2255                 return 0;
2256
2257         CDEBUG(D_INODE, "%s: try unlink to another MDT for "DFID"\n",
2258                exp->exp_obd->obd_name, PFID(&body->fid1));
2259
2260         /* This is a remote object, try remote MDT, Note: it may
2261          * try more than 1 time here, Considering following case
2262          * /mnt/lustre is root on MDT0, remote1 is on MDT1
2263          * 1. Initially A does not know where remote1 is, it send
2264          *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
2265          *    resend unlink RPC to MDT1 (retry 1st time).
2266          *
2267          * 2. During the unlink RPC in flight,
2268          *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
2269          *    and create new remote1, but on MDT0
2270          *
2271          * 3. MDT1 get unlink RPC(from A), then do remote lock on
2272          *    /mnt/lustre, then lookup get fid of remote1, and find
2273          *    it is remote dir again, and replay -EREMOTE again.
2274          *
2275          * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
2276          *
2277          * In theory, it might try unlimited time here, but it should
2278          * be very rare case.  */
2279         op_data->op_fid2 = body->fid1;
2280         ptlrpc_req_finished(*request);
2281         *request = NULL;
2282
2283         goto retry;
2284 }
2285
2286 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2287 {
2288         struct lmv_obd *lmv = &obd->u.lmv;
2289         int rc = 0;
2290
2291         switch (stage) {
2292         case OBD_CLEANUP_EARLY:
2293                 /* XXX: here should be calling obd_precleanup() down to
2294                  * stack. */
2295                 break;
2296         case OBD_CLEANUP_EXPORTS:
2297                 fld_client_proc_fini(&lmv->lmv_fld);
2298                 lprocfs_obd_cleanup(obd);
2299                 break;
2300         default:
2301                 break;
2302         }
2303         return rc;
2304 }
2305
2306 static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
2307                         __u32 keylen, void *key, __u32 *vallen, void *val,
2308                         struct lov_stripe_md *lsm)
2309 {
2310         struct obd_device       *obd;
2311         struct lmv_obd    *lmv;
2312         int                   rc = 0;
2313
2314         obd = class_exp2obd(exp);
2315         if (obd == NULL) {
2316                 CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n",
2317                        exp->exp_handle.h_cookie);
2318                 return -EINVAL;
2319         }
2320
2321         lmv = &obd->u.lmv;
2322         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2323                 struct lmv_tgt_desc *tgt;
2324                 int i;
2325
2326                 rc = lmv_check_connect(obd);
2327                 if (rc)
2328                         return rc;
2329
2330                 LASSERT(*vallen == sizeof(__u32));
2331                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2332                         tgt = lmv->tgts[i];
2333                         /*
2334                          * All tgts should be connected when this gets called.
2335                          */
2336                         if (tgt == NULL || tgt->ltd_exp == NULL)
2337                                 continue;
2338
2339                         if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
2340                                           vallen, val, NULL))
2341                                 return 0;
2342                 }
2343                 return -EINVAL;
2344         } else if (KEY_IS(KEY_MAX_EASIZE) || KEY_IS(KEY_CONN_DATA)) {
2345                 rc = lmv_check_connect(obd);
2346                 if (rc)
2347                         return rc;
2348
2349                 /*
2350                  * Forwarding this request to first MDS, it should know LOV
2351                  * desc.
2352                  */
2353                 rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key,
2354                                   vallen, val, NULL);
2355                 if (!rc && KEY_IS(KEY_CONN_DATA))
2356                         exp->exp_connect_data = *(struct obd_connect_data *)val;
2357                 return rc;
2358         } else if (KEY_IS(KEY_TGT_COUNT)) {
2359                 *((int *)val) = lmv->desc.ld_tgt_count;
2360                 return 0;
2361         }
2362
2363         CDEBUG(D_IOCTL, "Invalid key\n");
2364         return -EINVAL;
2365 }
2366
2367 int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
2368                        obd_count keylen, void *key, obd_count vallen,
2369                        void *val, struct ptlrpc_request_set *set)
2370 {
2371         struct lmv_tgt_desc    *tgt;
2372         struct obd_device      *obd;
2373         struct lmv_obd   *lmv;
2374         int rc = 0;
2375
2376         obd = class_exp2obd(exp);
2377         if (obd == NULL) {
2378                 CDEBUG(D_IOCTL, "Invalid client cookie "LPX64"\n",
2379                        exp->exp_handle.h_cookie);
2380                 return -EINVAL;
2381         }
2382         lmv = &obd->u.lmv;
2383
2384         if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX)) {
2385                 int i, err = 0;
2386
2387                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2388                         tgt = lmv->tgts[i];
2389
2390                         if (tgt == NULL || tgt->ltd_exp == NULL)
2391                                 continue;
2392
2393                         err = obd_set_info_async(env, tgt->ltd_exp,
2394                                                  keylen, key, vallen, val, set);
2395                         if (err && rc == 0)
2396                                 rc = err;
2397                 }
2398
2399                 return rc;
2400         }
2401
2402         return -EINVAL;
2403 }
2404
2405 int lmv_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
2406                struct lov_stripe_md *lsm)
2407 {
2408         struct obd_device        *obd = class_exp2obd(exp);
2409         struct lmv_obd      *lmv = &obd->u.lmv;
2410         struct lmv_stripe_md      *meap;
2411         struct lmv_stripe_md      *lsmp;
2412         int                     mea_size;
2413         int                     i;
2414
2415         mea_size = lmv_get_easize(lmv);
2416         if (!lmmp)
2417                 return mea_size;
2418
2419         if (*lmmp && !lsm) {
2420                 OBD_FREE_LARGE(*lmmp, mea_size);
2421                 *lmmp = NULL;
2422                 return 0;
2423         }
2424
2425         if (*lmmp == NULL) {
2426                 OBD_ALLOC_LARGE(*lmmp, mea_size);
2427                 if (*lmmp == NULL)
2428                         return -ENOMEM;
2429         }
2430
2431         if (!lsm)
2432                 return mea_size;
2433
2434         lsmp = (struct lmv_stripe_md *)lsm;
2435         meap = (struct lmv_stripe_md *)*lmmp;
2436
2437         if (lsmp->mea_magic != MEA_MAGIC_LAST_CHAR &&
2438             lsmp->mea_magic != MEA_MAGIC_ALL_CHARS)
2439                 return -EINVAL;
2440
2441         meap->mea_magic = cpu_to_le32(lsmp->mea_magic);
2442         meap->mea_count = cpu_to_le32(lsmp->mea_count);
2443         meap->mea_master = cpu_to_le32(lsmp->mea_master);
2444
2445         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2446                 meap->mea_ids[i] = lsmp->mea_ids[i];
2447                 fid_cpu_to_le(&meap->mea_ids[i], &lsmp->mea_ids[i]);
2448         }
2449
2450         return mea_size;
2451 }
2452
2453 int lmv_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
2454                  struct lov_mds_md *lmm, int lmm_size)
2455 {
2456         struct obd_device         *obd = class_exp2obd(exp);
2457         struct lmv_stripe_md      **tmea = (struct lmv_stripe_md **)lsmp;
2458         struct lmv_stripe_md       *mea = (struct lmv_stripe_md *)lmm;
2459         struct lmv_obd       *lmv = &obd->u.lmv;
2460         int                      mea_size;
2461         int                      i;
2462         __u32                  magic;
2463
2464         mea_size = lmv_get_easize(lmv);
2465         if (lsmp == NULL)
2466                 return mea_size;
2467
2468         if (*lsmp != NULL && lmm == NULL) {
2469                 OBD_FREE_LARGE(*tmea, mea_size);
2470                 *lsmp = NULL;
2471                 return 0;
2472         }
2473
2474         LASSERT(mea_size == lmm_size);
2475
2476         OBD_ALLOC_LARGE(*tmea, mea_size);
2477         if (*tmea == NULL)
2478                 return -ENOMEM;
2479
2480         if (!lmm)
2481                 return mea_size;
2482
2483         if (mea->mea_magic == MEA_MAGIC_LAST_CHAR ||
2484             mea->mea_magic == MEA_MAGIC_ALL_CHARS ||
2485             mea->mea_magic == MEA_MAGIC_HASH_SEGMENT)
2486         {
2487                 magic = le32_to_cpu(mea->mea_magic);
2488         } else {
2489                 /*
2490                  * Old mea is not handled here.
2491                  */
2492                 CERROR("Old not supportable EA is found\n");
2493                 LBUG();
2494         }
2495
2496         (*tmea)->mea_magic = magic;
2497         (*tmea)->mea_count = le32_to_cpu(mea->mea_count);
2498         (*tmea)->mea_master = le32_to_cpu(mea->mea_master);
2499
2500         for (i = 0; i < (*tmea)->mea_count; i++) {
2501                 (*tmea)->mea_ids[i] = mea->mea_ids[i];
2502                 fid_le_to_cpu(&(*tmea)->mea_ids[i], &(*tmea)->mea_ids[i]);
2503         }
2504         return mea_size;
2505 }
2506
2507 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
2508                              ldlm_policy_data_t *policy, ldlm_mode_t mode,
2509                              ldlm_cancel_flags_t flags, void *opaque)
2510 {
2511         struct obd_device       *obd = exp->exp_obd;
2512         struct lmv_obd    *lmv = &obd->u.lmv;
2513         int                   rc = 0;
2514         int                   err;
2515         int                   i;
2516
2517         LASSERT(fid != NULL);
2518
2519         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2520                 if (lmv->tgts[i] == NULL || lmv->tgts[i]->ltd_exp == NULL ||
2521                     lmv->tgts[i]->ltd_active == 0)
2522                         continue;
2523
2524                 err = md_cancel_unused(lmv->tgts[i]->ltd_exp, fid,
2525                                        policy, mode, flags, opaque);
2526                 if (!rc)
2527                         rc = err;
2528         }
2529         return rc;
2530 }
2531
2532 int lmv_set_lock_data(struct obd_export *exp, __u64 *lockh, void *data,
2533                       __u64 *bits)
2534 {
2535         struct lmv_obd    *lmv = &exp->exp_obd->u.lmv;
2536         int                   rc;
2537
2538         rc =  md_set_lock_data(lmv->tgts[0]->ltd_exp, lockh, data, bits);
2539         return rc;
2540 }
2541
2542 ldlm_mode_t lmv_lock_match(struct obd_export *exp, __u64 flags,
2543                            const struct lu_fid *fid, ldlm_type_t type,
2544                            ldlm_policy_data_t *policy, ldlm_mode_t mode,
2545                            struct lustre_handle *lockh)
2546 {
2547         struct obd_device       *obd = exp->exp_obd;
2548         struct lmv_obd    *lmv = &obd->u.lmv;
2549         ldlm_mode_t           rc;
2550         int                   i;
2551
2552         CDEBUG(D_INODE, "Lock match for "DFID"\n", PFID(fid));
2553
2554         /*
2555          * With CMD every object can have two locks in different namespaces:
2556          * lookup lock in space of mds storing direntry and update/open lock in
2557          * space of mds storing inode. Thus we check all targets, not only that
2558          * one fid was created in.
2559          */
2560         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2561                 if (lmv->tgts[i] == NULL ||
2562                     lmv->tgts[i]->ltd_exp == NULL ||
2563                     lmv->tgts[i]->ltd_active == 0)
2564                         continue;
2565
2566                 rc = md_lock_match(lmv->tgts[i]->ltd_exp, flags, fid,
2567                                    type, policy, mode, lockh);
2568                 if (rc)
2569                         return rc;
2570         }
2571
2572         return 0;
2573 }
2574
2575 int lmv_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req,
2576                       struct obd_export *dt_exp, struct obd_export *md_exp,
2577                       struct lustre_md *md)
2578 {
2579         struct lmv_obd    *lmv = &exp->exp_obd->u.lmv;
2580
2581         return md_get_lustre_md(lmv->tgts[0]->ltd_exp, req, dt_exp, md_exp, md);
2582 }
2583
2584 int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2585 {
2586         struct obd_device       *obd = exp->exp_obd;
2587         struct lmv_obd    *lmv = &obd->u.lmv;
2588
2589         if (md->mea)
2590                 obd_free_memmd(exp, (void *)&md->mea);
2591         return md_free_lustre_md(lmv->tgts[0]->ltd_exp, md);
2592 }
2593
2594 int lmv_set_open_replay_data(struct obd_export *exp,
2595                              struct obd_client_handle *och,
2596                              struct ptlrpc_request *open_req)
2597 {
2598         struct obd_device       *obd = exp->exp_obd;
2599         struct lmv_obd    *lmv = &obd->u.lmv;
2600         struct lmv_tgt_desc     *tgt;
2601
2602         tgt = lmv_find_target(lmv, &och->och_fid);
2603         if (IS_ERR(tgt))
2604                 return PTR_ERR(tgt);
2605
2606         return md_set_open_replay_data(tgt->ltd_exp, och, open_req);
2607 }
2608
2609 int lmv_clear_open_replay_data(struct obd_export *exp,
2610                                struct obd_client_handle *och)
2611 {
2612         struct obd_device       *obd = exp->exp_obd;
2613         struct lmv_obd    *lmv = &obd->u.lmv;
2614         struct lmv_tgt_desc     *tgt;
2615
2616         tgt = lmv_find_target(lmv, &och->och_fid);
2617         if (IS_ERR(tgt))
2618                 return PTR_ERR(tgt);
2619
2620         return md_clear_open_replay_data(tgt->ltd_exp, och);
2621 }
2622
2623 static int lmv_get_remote_perm(struct obd_export *exp,
2624                                const struct lu_fid *fid,
2625                                struct obd_capa *oc, __u32 suppgid,
2626                                struct ptlrpc_request **request)
2627 {
2628         struct obd_device       *obd = exp->exp_obd;
2629         struct lmv_obd    *lmv = &obd->u.lmv;
2630         struct lmv_tgt_desc     *tgt;
2631         int                   rc;
2632
2633         rc = lmv_check_connect(obd);
2634         if (rc)
2635                 return rc;
2636
2637         tgt = lmv_find_target(lmv, fid);
2638         if (IS_ERR(tgt))
2639                 return PTR_ERR(tgt);
2640
2641         rc = md_get_remote_perm(tgt->ltd_exp, fid, oc, suppgid, request);
2642         return rc;
2643 }
2644
2645 static int lmv_renew_capa(struct obd_export *exp, struct obd_capa *oc,
2646                           renew_capa_cb_t cb)
2647 {
2648         struct obd_device       *obd = exp->exp_obd;
2649         struct lmv_obd    *lmv = &obd->u.lmv;
2650         struct lmv_tgt_desc     *tgt;
2651         int                   rc;
2652
2653         rc = lmv_check_connect(obd);
2654         if (rc)
2655                 return rc;
2656
2657         tgt = lmv_find_target(lmv, &oc->c_capa.lc_fid);
2658         if (IS_ERR(tgt))
2659                 return PTR_ERR(tgt);
2660
2661         rc = md_renew_capa(tgt->ltd_exp, oc, cb);
2662         return rc;
2663 }
2664
2665 int lmv_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
2666                     const struct req_msg_field *field, struct obd_capa **oc)
2667 {
2668         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
2669
2670         return md_unpack_capa(lmv->tgts[0]->ltd_exp, req, field, oc);
2671 }
2672
2673 int lmv_intent_getattr_async(struct obd_export *exp,
2674                              struct md_enqueue_info *minfo,
2675                              struct ldlm_enqueue_info *einfo)
2676 {
2677         struct md_op_data       *op_data = &minfo->mi_data;
2678         struct obd_device       *obd = exp->exp_obd;
2679         struct lmv_obd    *lmv = &obd->u.lmv;
2680         struct lmv_tgt_desc     *tgt = NULL;
2681         int                   rc;
2682
2683         rc = lmv_check_connect(obd);
2684         if (rc)
2685                 return rc;
2686
2687         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2688         if (IS_ERR(tgt))
2689                 return PTR_ERR(tgt);
2690
2691         rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
2692         return rc;
2693 }
2694
2695 int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
2696                         struct lu_fid *fid, __u64 *bits)
2697 {
2698         struct obd_device       *obd = exp->exp_obd;
2699         struct lmv_obd    *lmv = &obd->u.lmv;
2700         struct lmv_tgt_desc     *tgt;
2701         int                   rc;
2702
2703         rc = lmv_check_connect(obd);
2704         if (rc)
2705                 return rc;
2706
2707         tgt = lmv_find_target(lmv, fid);
2708         if (IS_ERR(tgt))
2709                 return PTR_ERR(tgt);
2710
2711         rc = md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
2712         return rc;
2713 }
2714
2715 /**
2716  * For lmv, only need to send request to master MDT, and the master MDT will
2717  * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
2718  * we directly fetch data from the slave MDTs.
2719  */
2720 int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
2721                  struct obd_quotactl *oqctl)
2722 {
2723         struct obd_device   *obd = class_exp2obd(exp);
2724         struct lmv_obd      *lmv = &obd->u.lmv;
2725         struct lmv_tgt_desc *tgt = lmv->tgts[0];
2726         int               rc = 0, i;
2727         __u64           curspace, curinodes;
2728
2729         if (!lmv->desc.ld_tgt_count || !tgt->ltd_active) {
2730                 CERROR("master lmv inactive\n");
2731                 return -EIO;
2732         }
2733
2734         if (oqctl->qc_cmd != Q_GETOQUOTA) {
2735                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
2736                 return rc;
2737         }
2738
2739         curspace = curinodes = 0;
2740         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2741                 int err;
2742                 tgt = lmv->tgts[i];
2743
2744                 if (tgt == NULL || tgt->ltd_exp == NULL || tgt->ltd_active == 0)
2745                         continue;
2746                 if (!tgt->ltd_active) {
2747                         CDEBUG(D_HA, "mdt %d is inactive.\n", i);
2748                         continue;
2749                 }
2750
2751                 err = obd_quotactl(tgt->ltd_exp, oqctl);
2752                 if (err) {
2753                         CERROR("getquota on mdt %d failed. %d\n", i, err);
2754                         if (!rc)
2755                                 rc = err;
2756                 } else {
2757                         curspace += oqctl->qc_dqblk.dqb_curspace;
2758                         curinodes += oqctl->qc_dqblk.dqb_curinodes;
2759                 }
2760         }
2761         oqctl->qc_dqblk.dqb_curspace = curspace;
2762         oqctl->qc_dqblk.dqb_curinodes = curinodes;
2763
2764         return rc;
2765 }
2766
2767 int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp,
2768                    struct obd_quotactl *oqctl)
2769 {
2770         struct obd_device   *obd = class_exp2obd(exp);
2771         struct lmv_obd      *lmv = &obd->u.lmv;
2772         struct lmv_tgt_desc *tgt;
2773         int               i, rc = 0;
2774
2775         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2776                 int err;
2777                 tgt = lmv->tgts[i];
2778                 if (tgt == NULL || tgt->ltd_exp == NULL || !tgt->ltd_active) {
2779                         CERROR("lmv idx %d inactive\n", i);
2780                         return -EIO;
2781                 }
2782
2783                 err = obd_quotacheck(tgt->ltd_exp, oqctl);
2784                 if (err && !rc)
2785                         rc = err;
2786         }
2787
2788         return rc;
2789 }
2790
2791 struct obd_ops lmv_obd_ops = {
2792         .o_owner                = THIS_MODULE,
2793         .o_setup                = lmv_setup,
2794         .o_cleanup            = lmv_cleanup,
2795         .o_precleanup      = lmv_precleanup,
2796         .o_process_config       = lmv_process_config,
2797         .o_connect            = lmv_connect,
2798         .o_disconnect      = lmv_disconnect,
2799         .o_statfs              = lmv_statfs,
2800         .o_get_info          = lmv_get_info,
2801         .o_set_info_async       = lmv_set_info_async,
2802         .o_packmd              = lmv_packmd,
2803         .o_unpackmd          = lmv_unpackmd,
2804         .o_notify              = lmv_notify,
2805         .o_get_uuid          = lmv_get_uuid,
2806         .o_iocontrol        = lmv_iocontrol,
2807         .o_quotacheck      = lmv_quotacheck,
2808         .o_quotactl          = lmv_quotactl
2809 };
2810
2811 struct md_ops lmv_md_ops = {
2812         .m_getstatus        = lmv_getstatus,
2813         .m_null_inode           = lmv_null_inode,
2814         .m_find_cbdata    = lmv_find_cbdata,
2815         .m_close                = lmv_close,
2816         .m_create              = lmv_create,
2817         .m_done_writing  = lmv_done_writing,
2818         .m_enqueue            = lmv_enqueue,
2819         .m_getattr            = lmv_getattr,
2820         .m_getxattr          = lmv_getxattr,
2821         .m_getattr_name  = lmv_getattr_name,
2822         .m_intent_lock    = lmv_intent_lock,
2823         .m_link          = lmv_link,
2824         .m_rename              = lmv_rename,
2825         .m_setattr            = lmv_setattr,
2826         .m_setxattr          = lmv_setxattr,
2827         .m_sync          = lmv_sync,
2828         .m_readpage          = lmv_readpage,
2829         .m_unlink              = lmv_unlink,
2830         .m_init_ea_size  = lmv_init_ea_size,
2831         .m_cancel_unused        = lmv_cancel_unused,
2832         .m_set_lock_data        = lmv_set_lock_data,
2833         .m_lock_match      = lmv_lock_match,
2834         .m_get_lustre_md        = lmv_get_lustre_md,
2835         .m_free_lustre_md       = lmv_free_lustre_md,
2836         .m_set_open_replay_data = lmv_set_open_replay_data,
2837         .m_clear_open_replay_data = lmv_clear_open_replay_data,
2838         .m_renew_capa      = lmv_renew_capa,
2839         .m_unpack_capa    = lmv_unpack_capa,
2840         .m_get_remote_perm      = lmv_get_remote_perm,
2841         .m_intent_getattr_async = lmv_intent_getattr_async,
2842         .m_revalidate_lock      = lmv_revalidate_lock
2843 };
2844
2845 int __init lmv_init(void)
2846 {
2847         struct lprocfs_static_vars lvars;
2848         int                     rc;
2849
2850         lprocfs_lmv_init_vars(&lvars);
2851
2852         rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
2853                                  lvars.module_vars, LUSTRE_LMV_NAME, NULL);
2854         return rc;
2855 }
2856
2857 static void lmv_exit(void)
2858 {
2859         class_unregister_type(LUSTRE_LMV_NAME);
2860 }
2861
2862 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2863 MODULE_DESCRIPTION("Lustre Logical Metadata Volume OBD driver");
2864 MODULE_LICENSE("GPL");
2865
2866 module_init(lmv_init);
2867 module_exit(lmv_exit);