staging: lustre: remove ENTRY macro
[firefly-linux-kernel-4.4.55.git] / drivers / staging / lustre / lustre / lov / lov_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LOV
38
39 #include <linux/libcfs/libcfs.h>
40
41 #include <obd_class.h>
42 #include <obd_lov.h>
43 #include <lustre/lustre_idl.h>
44
45 #include "lov_internal.h"
46
47 static void lov_init_set(struct lov_request_set *set)
48 {
49         set->set_count = 0;
50         atomic_set(&set->set_completes, 0);
51         atomic_set(&set->set_success, 0);
52         atomic_set(&set->set_finish_checked, 0);
53         set->set_cookies = 0;
54         INIT_LIST_HEAD(&set->set_list);
55         atomic_set(&set->set_refcount, 1);
56         init_waitqueue_head(&set->set_waitq);
57         spin_lock_init(&set->set_lock);
58 }
59
60 void lov_finish_set(struct lov_request_set *set)
61 {
62         struct list_head *pos, *n;
63
64         LASSERT(set);
65         list_for_each_safe(pos, n, &set->set_list) {
66                 struct lov_request *req = list_entry(pos,
67                                                          struct lov_request,
68                                                          rq_link);
69                 list_del_init(&req->rq_link);
70
71                 if (req->rq_oi.oi_oa)
72                         OBDO_FREE(req->rq_oi.oi_oa);
73                 if (req->rq_oi.oi_md)
74                         OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
75                 if (req->rq_oi.oi_osfs)
76                         OBD_FREE(req->rq_oi.oi_osfs,
77                                  sizeof(*req->rq_oi.oi_osfs));
78                 OBD_FREE(req, sizeof(*req));
79         }
80
81         if (set->set_pga) {
82                 int len = set->set_oabufs * sizeof(*set->set_pga);
83                 OBD_FREE_LARGE(set->set_pga, len);
84         }
85         if (set->set_lockh)
86                 lov_llh_put(set->set_lockh);
87
88         OBD_FREE(set, sizeof(*set));
89         EXIT;
90 }
91
92 int lov_set_finished(struct lov_request_set *set, int idempotent)
93 {
94         int completes = atomic_read(&set->set_completes);
95
96         CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
97
98         if (completes == set->set_count) {
99                 if (idempotent)
100                         return 1;
101                 if (atomic_inc_return(&set->set_finish_checked) == 1)
102                         return 1;
103         }
104         return 0;
105 }
106
107 void lov_update_set(struct lov_request_set *set,
108                     struct lov_request *req, int rc)
109 {
110         req->rq_complete = 1;
111         req->rq_rc = rc;
112
113         atomic_inc(&set->set_completes);
114         if (rc == 0)
115                 atomic_inc(&set->set_success);
116
117         wake_up(&set->set_waitq);
118 }
119
120 int lov_update_common_set(struct lov_request_set *set,
121                           struct lov_request *req, int rc)
122 {
123         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
124
125         lov_update_set(set, req, rc);
126
127         /* grace error on inactive ost */
128         if (rc && !(lov->lov_tgts[req->rq_idx] &&
129                     lov->lov_tgts[req->rq_idx]->ltd_active))
130                 rc = 0;
131
132         /* FIXME in raid1 regime, should return 0 */
133         RETURN(rc);
134 }
135
136 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
137 {
138         list_add_tail(&req->rq_link, &set->set_list);
139         set->set_count++;
140         req->rq_rqset = set;
141 }
142
143 static int lov_check_set(struct lov_obd *lov, int idx)
144 {
145         int rc = 0;
146         mutex_lock(&lov->lov_lock);
147
148         if (lov->lov_tgts[idx] == NULL ||
149             lov->lov_tgts[idx]->ltd_active ||
150             (lov->lov_tgts[idx]->ltd_exp != NULL &&
151              class_exp2cliimp(lov->lov_tgts[idx]->ltd_exp)->imp_connect_tried))
152                 rc = 1;
153
154         mutex_unlock(&lov->lov_lock);
155         return rc;
156 }
157
158 /* Check if the OSC connection exists and is active.
159  * If the OSC has not yet had a chance to connect to the OST the first time,
160  * wait once for it to connect instead of returning an error.
161  */
162 int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
163 {
164         wait_queue_head_t waitq;
165         struct l_wait_info lwi;
166         struct lov_tgt_desc *tgt;
167         int rc = 0;
168
169         mutex_lock(&lov->lov_lock);
170
171         tgt = lov->lov_tgts[ost_idx];
172
173         if (unlikely(tgt == NULL))
174                 GOTO(out, rc = 0);
175
176         if (likely(tgt->ltd_active))
177                 GOTO(out, rc = 1);
178
179         if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried)
180                 GOTO(out, rc = 0);
181
182         mutex_unlock(&lov->lov_lock);
183
184         init_waitqueue_head(&waitq);
185         lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
186                                    cfs_time_seconds(1), NULL, NULL);
187
188         rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
189         if (tgt != NULL && tgt->ltd_active)
190                 return 1;
191
192         return 0;
193
194 out:
195         mutex_unlock(&lov->lov_lock);
196         return rc;
197 }
198
199 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
200                                struct lov_oinfo *loi, int flags,
201                                struct ost_lvb *lvb, __u32 mode, int rc);
202
203 static int lov_update_enqueue_lov(struct obd_export *exp,
204                                   struct lustre_handle *lov_lockhp,
205                                   struct lov_oinfo *loi, int flags, int idx,
206                                   struct ost_id *oi, int rc)
207 {
208         struct lov_obd *lov = &exp->exp_obd->u.lov;
209
210         if (rc != ELDLM_OK &&
211             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
212                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
213                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
214                         /* -EUSERS used by OST to report file contention */
215                         if (rc != -EINTR && rc != -EUSERS)
216                                 CERROR("%s: enqueue objid "DOSTID" subobj"
217                                        DOSTID" on OST idx %d: rc %d\n",
218                                        exp->exp_obd->obd_name,
219                                        POSTID(oi), POSTID(&loi->loi_oi),
220                                        loi->loi_ost_idx, rc);
221                 } else
222                         rc = ELDLM_OK;
223         }
224         return rc;
225 }
226
227 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
228 {
229         struct lov_request_set *set = req->rq_rqset;
230         struct lustre_handle *lov_lockhp;
231         struct obd_info *oi = set->set_oi;
232         struct lov_oinfo *loi;
233
234         LASSERT(oi != NULL);
235
236         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
237         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
238
239         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
240          * and that copy can be arbitrarily out of date.
241          *
242          * The LOV API is due for a serious rewriting anyways, and this
243          * can be addressed then. */
244
245         lov_stripe_lock(oi->oi_md);
246         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
247                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
248         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
249                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
250         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
251                                     req->rq_idx, &oi->oi_md->lsm_oi, rc);
252         lov_stripe_unlock(oi->oi_md);
253         lov_update_set(set, req, rc);
254         RETURN(rc);
255 }
256
257 /* The callback for osc_enqueue that updates lov info for every OSC request. */
258 static int cb_update_enqueue(void *cookie, int rc)
259 {
260         struct obd_info *oinfo = cookie;
261         struct ldlm_enqueue_info *einfo;
262         struct lov_request *lovreq;
263
264         lovreq = container_of(oinfo, struct lov_request, rq_oi);
265         einfo = lovreq->rq_rqset->set_ei;
266         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
267 }
268
269 static int enqueue_done(struct lov_request_set *set, __u32 mode)
270 {
271         struct lov_request *req;
272         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
273         int completes = atomic_read(&set->set_completes);
274         int rc = 0;
275
276         /* enqueue/match success, just return */
277         if (completes && completes == atomic_read(&set->set_success))
278                 RETURN(0);
279
280         /* cancel enqueued/matched locks */
281         list_for_each_entry(req, &set->set_list, rq_link) {
282                 struct lustre_handle *lov_lockhp;
283
284                 if (!req->rq_complete || req->rq_rc)
285                         continue;
286
287                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
288                 LASSERT(lov_lockhp);
289                 if (!lustre_handle_is_used(lov_lockhp))
290                         continue;
291
292                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
293                                 req->rq_oi.oi_md, mode, lov_lockhp);
294                 if (rc && lov->lov_tgts[req->rq_idx] &&
295                     lov->lov_tgts[req->rq_idx]->ltd_active)
296                         CERROR("%s: cancelling obdjid "DOSTID" on OST"
297                                "idx %d error: rc = %d\n",
298                                set->set_exp->exp_obd->obd_name,
299                                POSTID(&req->rq_oi.oi_md->lsm_oi),
300                                req->rq_idx, rc);
301         }
302         if (set->set_lockh)
303                 lov_llh_put(set->set_lockh);
304         RETURN(rc);
305 }
306
307 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
308                          struct ptlrpc_request_set *rqset)
309 {
310         int ret = 0;
311
312         if (set == NULL)
313                 RETURN(0);
314         LASSERT(set->set_exp);
315         /* Do enqueue_done only for sync requests and if any request
316          * succeeded. */
317         if (!rqset) {
318                 if (rc)
319                         atomic_set(&set->set_completes, 0);
320                 ret = enqueue_done(set, mode);
321         } else if (set->set_lockh)
322                 lov_llh_put(set->set_lockh);
323
324         lov_put_reqset(set);
325
326         RETURN(rc ? rc : ret);
327 }
328
329 static void lov_llh_addref(void *llhp)
330 {
331         struct lov_lock_handles *llh = llhp;
332
333         atomic_inc(&llh->llh_refcount);
334         CDEBUG(D_INFO, "GETting llh %p : new refcount %d\n", llh,
335                atomic_read(&llh->llh_refcount));
336 }
337
338 static struct portals_handle_ops lov_handle_ops = {
339         .hop_addref = lov_llh_addref,
340         .hop_free   = NULL,
341 };
342
343 static struct lov_lock_handles *lov_llh_new(struct lov_stripe_md *lsm)
344 {
345         struct lov_lock_handles *llh;
346
347         OBD_ALLOC(llh, sizeof *llh +
348                   sizeof(*llh->llh_handles) * lsm->lsm_stripe_count);
349         if (llh == NULL)
350                 return NULL;
351
352         atomic_set(&llh->llh_refcount, 2);
353         llh->llh_stripe_count = lsm->lsm_stripe_count;
354         INIT_LIST_HEAD(&llh->llh_handle.h_link);
355         class_handle_hash(&llh->llh_handle, &lov_handle_ops);
356
357         return llh;
358 }
359
360 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
361                          struct ldlm_enqueue_info *einfo,
362                          struct lov_request_set **reqset)
363 {
364         struct lov_obd *lov = &exp->exp_obd->u.lov;
365         struct lov_request_set *set;
366         int i, rc = 0;
367
368         OBD_ALLOC(set, sizeof(*set));
369         if (set == NULL)
370                 RETURN(-ENOMEM);
371         lov_init_set(set);
372
373         set->set_exp = exp;
374         set->set_oi = oinfo;
375         set->set_ei = einfo;
376         set->set_lockh = lov_llh_new(oinfo->oi_md);
377         if (set->set_lockh == NULL)
378                 GOTO(out_set, rc = -ENOMEM);
379         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
380
381         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
382                 struct lov_oinfo *loi;
383                 struct lov_request *req;
384                 obd_off start, end;
385
386                 loi = oinfo->oi_md->lsm_oinfo[i];
387                 if (!lov_stripe_intersects(oinfo->oi_md, i,
388                                            oinfo->oi_policy.l_extent.start,
389                                            oinfo->oi_policy.l_extent.end,
390                                            &start, &end))
391                         continue;
392
393                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
394                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
395                         continue;
396                 }
397
398                 OBD_ALLOC(req, sizeof(*req));
399                 if (req == NULL)
400                         GOTO(out_set, rc = -ENOMEM);
401
402                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
403                         sizeof(struct lov_oinfo *) +
404                         sizeof(struct lov_oinfo);
405                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
406                 if (req->rq_oi.oi_md == NULL) {
407                         OBD_FREE(req, sizeof(*req));
408                         GOTO(out_set, rc = -ENOMEM);
409                 }
410                 req->rq_oi.oi_md->lsm_oinfo[0] =
411                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
412                         sizeof(struct lov_oinfo *);
413
414                 /* Set lov request specific parameters. */
415                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
416                 req->rq_oi.oi_cb_up = cb_update_enqueue;
417                 req->rq_oi.oi_flags = oinfo->oi_flags;
418
419                 LASSERT(req->rq_oi.oi_lockh);
420
421                 req->rq_oi.oi_policy.l_extent.gid =
422                         oinfo->oi_policy.l_extent.gid;
423                 req->rq_oi.oi_policy.l_extent.start = start;
424                 req->rq_oi.oi_policy.l_extent.end = end;
425
426                 req->rq_idx = loi->loi_ost_idx;
427                 req->rq_stripe = i;
428
429                 /* XXX LOV STACKING: submd should be from the subobj */
430                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
431                 req->rq_oi.oi_md->lsm_stripe_count = 0;
432                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
433                         loi->loi_kms_valid;
434                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
435                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
436
437                 lov_set_add_req(req, set);
438         }
439         if (!set->set_count)
440                 GOTO(out_set, rc = -EIO);
441         *reqset = set;
442         RETURN(0);
443 out_set:
444         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
445         RETURN(rc);
446 }
447
448 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
449 {
450         int rc = 0;
451
452         if (set == NULL)
453                 RETURN(0);
454         LASSERT(set->set_exp);
455         rc = enqueue_done(set, mode);
456         if ((set->set_count == atomic_read(&set->set_success)) &&
457             (flags & LDLM_FL_TEST_LOCK))
458                 lov_llh_put(set->set_lockh);
459
460         lov_put_reqset(set);
461
462         RETURN(rc);
463 }
464
465 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
466                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
467                        __u32 mode, struct lustre_handle *lockh,
468                        struct lov_request_set **reqset)
469 {
470         struct lov_obd *lov = &exp->exp_obd->u.lov;
471         struct lov_request_set *set;
472         int i, rc = 0;
473
474         OBD_ALLOC(set, sizeof(*set));
475         if (set == NULL)
476                 RETURN(-ENOMEM);
477         lov_init_set(set);
478
479         set->set_exp = exp;
480         set->set_oi = oinfo;
481         set->set_oi->oi_md = lsm;
482         set->set_lockh = lov_llh_new(lsm);
483         if (set->set_lockh == NULL)
484                 GOTO(out_set, rc = -ENOMEM);
485         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
486
487         for (i = 0; i < lsm->lsm_stripe_count; i++){
488                 struct lov_oinfo *loi;
489                 struct lov_request *req;
490                 obd_off start, end;
491
492                 loi = lsm->lsm_oinfo[i];
493                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
494                                            policy->l_extent.end, &start, &end))
495                         continue;
496
497                 /* FIXME raid1 should grace this error */
498                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
499                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
500                         GOTO(out_set, rc = -EIO);
501                 }
502
503                 OBD_ALLOC(req, sizeof(*req));
504                 if (req == NULL)
505                         GOTO(out_set, rc = -ENOMEM);
506
507                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
508                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
509                 if (req->rq_oi.oi_md == NULL) {
510                         OBD_FREE(req, sizeof(*req));
511                         GOTO(out_set, rc = -ENOMEM);
512                 }
513
514                 req->rq_oi.oi_policy.l_extent.start = start;
515                 req->rq_oi.oi_policy.l_extent.end = end;
516                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
517
518                 req->rq_idx = loi->loi_ost_idx;
519                 req->rq_stripe = i;
520
521                 /* XXX LOV STACKING: submd should be from the subobj */
522                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
523                 req->rq_oi.oi_md->lsm_stripe_count = 0;
524
525                 lov_set_add_req(req, set);
526         }
527         if (!set->set_count)
528                 GOTO(out_set, rc = -EIO);
529         *reqset = set;
530         RETURN(rc);
531 out_set:
532         lov_fini_match_set(set, mode, 0);
533         RETURN(rc);
534 }
535
536 int lov_fini_cancel_set(struct lov_request_set *set)
537 {
538         int rc = 0;
539
540         if (set == NULL)
541                 RETURN(0);
542
543         LASSERT(set->set_exp);
544         if (set->set_lockh)
545                 lov_llh_put(set->set_lockh);
546
547         lov_put_reqset(set);
548
549         RETURN(rc);
550 }
551
552 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
553                         struct lov_stripe_md *lsm, __u32 mode,
554                         struct lustre_handle *lockh,
555                         struct lov_request_set **reqset)
556 {
557         struct lov_request_set *set;
558         int i, rc = 0;
559
560         OBD_ALLOC(set, sizeof(*set));
561         if (set == NULL)
562                 RETURN(-ENOMEM);
563         lov_init_set(set);
564
565         set->set_exp = exp;
566         set->set_oi = oinfo;
567         set->set_oi->oi_md = lsm;
568         set->set_lockh = lov_handle2llh(lockh);
569         if (set->set_lockh == NULL) {
570                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
571                 GOTO(out_set, rc = -EINVAL);
572         }
573         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
574
575         for (i = 0; i < lsm->lsm_stripe_count; i++){
576                 struct lov_request *req;
577                 struct lustre_handle *lov_lockhp;
578                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
579
580                 lov_lockhp = set->set_lockh->llh_handles + i;
581                 if (!lustre_handle_is_used(lov_lockhp)) {
582                         CDEBUG(D_INFO, "lov idx %d subobj "DOSTID" no lock\n",
583                                loi->loi_ost_idx, POSTID(&loi->loi_oi));
584                         continue;
585                 }
586
587                 OBD_ALLOC(req, sizeof(*req));
588                 if (req == NULL)
589                         GOTO(out_set, rc = -ENOMEM);
590
591                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
592                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
593                 if (req->rq_oi.oi_md == NULL) {
594                         OBD_FREE(req, sizeof(*req));
595                         GOTO(out_set, rc = -ENOMEM);
596                 }
597
598                 req->rq_idx = loi->loi_ost_idx;
599                 req->rq_stripe = i;
600
601                 /* XXX LOV STACKING: submd should be from the subobj */
602                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
603                 req->rq_oi.oi_md->lsm_stripe_count = 0;
604
605                 lov_set_add_req(req, set);
606         }
607         if (!set->set_count)
608                 GOTO(out_set, rc = -EIO);
609         *reqset = set;
610         RETURN(rc);
611 out_set:
612         lov_fini_cancel_set(set);
613         RETURN(rc);
614 }
615 static int common_attr_done(struct lov_request_set *set)
616 {
617         struct list_head *pos;
618         struct lov_request *req;
619         struct obdo *tmp_oa;
620         int rc = 0, attrset = 0;
621
622         LASSERT(set->set_oi != NULL);
623
624         if (set->set_oi->oi_oa == NULL)
625                 RETURN(0);
626
627         if (!atomic_read(&set->set_success))
628                 RETURN(-EIO);
629
630         OBDO_ALLOC(tmp_oa);
631         if (tmp_oa == NULL)
632                 GOTO(out, rc = -ENOMEM);
633
634         list_for_each (pos, &set->set_list) {
635                 req = list_entry(pos, struct lov_request, rq_link);
636
637                 if (!req->rq_complete || req->rq_rc)
638                         continue;
639                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
640                         continue;
641                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
642                                 req->rq_oi.oi_oa->o_valid,
643                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
644         }
645         if (!attrset) {
646                 CERROR("No stripes had valid attrs\n");
647                 rc = -EIO;
648         }
649         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
650             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
651                 /* When we take attributes of some epoch, we require all the
652                  * ost to be active. */
653                 CERROR("Not all the stripes had valid attrs\n");
654                 GOTO(out, rc = -EIO);
655         }
656
657         tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
658         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
659 out:
660         if (tmp_oa)
661                 OBDO_FREE(tmp_oa);
662         RETURN(rc);
663
664 }
665
666 static int brw_done(struct lov_request_set *set)
667 {
668         struct lov_stripe_md *lsm = set->set_oi->oi_md;
669         struct lov_oinfo     *loi = NULL;
670         struct list_head *pos;
671         struct lov_request *req;
672
673         list_for_each (pos, &set->set_list) {
674                 req = list_entry(pos, struct lov_request, rq_link);
675
676                 if (!req->rq_complete || req->rq_rc)
677                         continue;
678
679                 loi = lsm->lsm_oinfo[req->rq_stripe];
680
681                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
682                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
683         }
684
685         RETURN(0);
686 }
687
688 int lov_fini_brw_set(struct lov_request_set *set)
689 {
690         int rc = 0;
691
692         if (set == NULL)
693                 RETURN(0);
694         LASSERT(set->set_exp);
695         if (atomic_read(&set->set_completes)) {
696                 rc = brw_done(set);
697                 /* FIXME update qos data here */
698         }
699         lov_put_reqset(set);
700
701         RETURN(rc);
702 }
703
704 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
705                      obd_count oa_bufs, struct brw_page *pga,
706                      struct obd_trans_info *oti,
707                      struct lov_request_set **reqset)
708 {
709         struct {
710                 obd_count       index;
711                 obd_count       count;
712                 obd_count       off;
713         } *info = NULL;
714         struct lov_request_set *set;
715         struct lov_obd *lov = &exp->exp_obd->u.lov;
716         int rc = 0, i, shift;
717
718         OBD_ALLOC(set, sizeof(*set));
719         if (set == NULL)
720                 RETURN(-ENOMEM);
721         lov_init_set(set);
722
723         set->set_exp = exp;
724         set->set_oti = oti;
725         set->set_oi = oinfo;
726         set->set_oabufs = oa_bufs;
727         OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
728         if (!set->set_pga)
729                 GOTO(out, rc = -ENOMEM);
730
731         OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
732         if (!info)
733                 GOTO(out, rc = -ENOMEM);
734
735         /* calculate the page count for each stripe */
736         for (i = 0; i < oa_bufs; i++) {
737                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
738                 info[stripe].count++;
739         }
740
741         /* alloc and initialize lov request */
742         shift = 0;
743         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
744                 struct lov_oinfo *loi = NULL;
745                 struct lov_request *req;
746
747                 if (info[i].count == 0)
748                         continue;
749
750                 loi = oinfo->oi_md->lsm_oinfo[i];
751                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
752                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
753                         GOTO(out, rc = -EIO);
754                 }
755
756                 OBD_ALLOC(req, sizeof(*req));
757                 if (req == NULL)
758                         GOTO(out, rc = -ENOMEM);
759
760                 OBDO_ALLOC(req->rq_oi.oi_oa);
761                 if (req->rq_oi.oi_oa == NULL) {
762                         OBD_FREE(req, sizeof(*req));
763                         GOTO(out, rc = -ENOMEM);
764                 }
765
766                 if (oinfo->oi_oa) {
767                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
768                                sizeof(*req->rq_oi.oi_oa));
769                 }
770                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
771                 req->rq_oi.oi_oa->o_stripe_idx = i;
772
773                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
774                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
775                 if (req->rq_oi.oi_md == NULL) {
776                         OBDO_FREE(req->rq_oi.oi_oa);
777                         OBD_FREE(req, sizeof(*req));
778                         GOTO(out, rc = -ENOMEM);
779                 }
780
781                 req->rq_idx = loi->loi_ost_idx;
782                 req->rq_stripe = i;
783
784                 /* XXX LOV STACKING */
785                 req->rq_oi.oi_md->lsm_oi = loi->loi_oi;
786                 req->rq_oabufs = info[i].count;
787                 req->rq_pgaidx = shift;
788                 shift += req->rq_oabufs;
789
790                 /* remember the index for sort brw_page array */
791                 info[i].index = req->rq_pgaidx;
792
793                 req->rq_oi.oi_capa = oinfo->oi_capa;
794
795                 lov_set_add_req(req, set);
796         }
797         if (!set->set_count)
798                 GOTO(out, rc = -EIO);
799
800         /* rotate & sort the brw_page array */
801         for (i = 0; i < oa_bufs; i++) {
802                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
803
804                 shift = info[stripe].index + info[stripe].off;
805                 LASSERT(shift < oa_bufs);
806                 set->set_pga[shift] = pga[i];
807                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
808                                   &set->set_pga[shift].off);
809                 info[stripe].off++;
810         }
811 out:
812         if (info)
813                 OBD_FREE_LARGE(info,
814                                sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
815
816         if (rc == 0)
817                 *reqset = set;
818         else
819                 lov_fini_brw_set(set);
820
821         RETURN(rc);
822 }
823
824 int lov_fini_getattr_set(struct lov_request_set *set)
825 {
826         int rc = 0;
827
828         if (set == NULL)
829                 RETURN(0);
830         LASSERT(set->set_exp);
831         if (atomic_read(&set->set_completes))
832                 rc = common_attr_done(set);
833
834         lov_put_reqset(set);
835
836         RETURN(rc);
837 }
838
839 /* The callback for osc_getattr_async that finilizes a request info when a
840  * response is received. */
841 static int cb_getattr_update(void *cookie, int rc)
842 {
843         struct obd_info *oinfo = cookie;
844         struct lov_request *lovreq;
845         lovreq = container_of(oinfo, struct lov_request, rq_oi);
846         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
847 }
848
849 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
850                          struct lov_request_set **reqset)
851 {
852         struct lov_request_set *set;
853         struct lov_obd *lov = &exp->exp_obd->u.lov;
854         int rc = 0, i;
855
856         OBD_ALLOC(set, sizeof(*set));
857         if (set == NULL)
858                 RETURN(-ENOMEM);
859         lov_init_set(set);
860
861         set->set_exp = exp;
862         set->set_oi = oinfo;
863
864         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
865                 struct lov_oinfo *loi;
866                 struct lov_request *req;
867
868                 loi = oinfo->oi_md->lsm_oinfo[i];
869                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
870                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
871                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
872                                 /* SOM requires all the OSTs to be active. */
873                                 GOTO(out_set, rc = -EIO);
874                         continue;
875                 }
876
877                 OBD_ALLOC(req, sizeof(*req));
878                 if (req == NULL)
879                         GOTO(out_set, rc = -ENOMEM);
880
881                 req->rq_stripe = i;
882                 req->rq_idx = loi->loi_ost_idx;
883
884                 OBDO_ALLOC(req->rq_oi.oi_oa);
885                 if (req->rq_oi.oi_oa == NULL) {
886                         OBD_FREE(req, sizeof(*req));
887                         GOTO(out_set, rc = -ENOMEM);
888                 }
889                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
890                        sizeof(*req->rq_oi.oi_oa));
891                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
892                 req->rq_oi.oi_cb_up = cb_getattr_update;
893                 req->rq_oi.oi_capa = oinfo->oi_capa;
894
895                 lov_set_add_req(req, set);
896         }
897         if (!set->set_count)
898                 GOTO(out_set, rc = -EIO);
899         *reqset = set;
900         RETURN(rc);
901 out_set:
902         lov_fini_getattr_set(set);
903         RETURN(rc);
904 }
905
906 int lov_fini_destroy_set(struct lov_request_set *set)
907 {
908         if (set == NULL)
909                 RETURN(0);
910         LASSERT(set->set_exp);
911         if (atomic_read(&set->set_completes)) {
912                 /* FIXME update qos data here */
913         }
914
915         lov_put_reqset(set);
916
917         RETURN(0);
918 }
919
920 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
921                          struct obdo *src_oa, struct lov_stripe_md *lsm,
922                          struct obd_trans_info *oti,
923                          struct lov_request_set **reqset)
924 {
925         struct lov_request_set *set;
926         struct lov_obd *lov = &exp->exp_obd->u.lov;
927         int rc = 0, i;
928
929         OBD_ALLOC(set, sizeof(*set));
930         if (set == NULL)
931                 RETURN(-ENOMEM);
932         lov_init_set(set);
933
934         set->set_exp = exp;
935         set->set_oi = oinfo;
936         set->set_oi->oi_md = lsm;
937         set->set_oi->oi_oa = src_oa;
938         set->set_oti = oti;
939         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
940                 set->set_cookies = oti->oti_logcookies;
941
942         for (i = 0; i < lsm->lsm_stripe_count; i++) {
943                 struct lov_oinfo *loi;
944                 struct lov_request *req;
945
946                 loi = lsm->lsm_oinfo[i];
947                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
948                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
949                         continue;
950                 }
951
952                 OBD_ALLOC(req, sizeof(*req));
953                 if (req == NULL)
954                         GOTO(out_set, rc = -ENOMEM);
955
956                 req->rq_stripe = i;
957                 req->rq_idx = loi->loi_ost_idx;
958
959                 OBDO_ALLOC(req->rq_oi.oi_oa);
960                 if (req->rq_oi.oi_oa == NULL) {
961                         OBD_FREE(req, sizeof(*req));
962                         GOTO(out_set, rc = -ENOMEM);
963                 }
964                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
965                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
966                 lov_set_add_req(req, set);
967         }
968         if (!set->set_count)
969                 GOTO(out_set, rc = -EIO);
970         *reqset = set;
971         RETURN(rc);
972 out_set:
973         lov_fini_destroy_set(set);
974         RETURN(rc);
975 }
976
977 int lov_fini_setattr_set(struct lov_request_set *set)
978 {
979         int rc = 0;
980
981         if (set == NULL)
982                 RETURN(0);
983         LASSERT(set->set_exp);
984         if (atomic_read(&set->set_completes)) {
985                 rc = common_attr_done(set);
986                 /* FIXME update qos data here */
987         }
988
989         lov_put_reqset(set);
990         RETURN(rc);
991 }
992
993 int lov_update_setattr_set(struct lov_request_set *set,
994                            struct lov_request *req, int rc)
995 {
996         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
997         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
998
999         lov_update_set(set, req, rc);
1000
1001         /* grace error on inactive ost */
1002         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1003                     lov->lov_tgts[req->rq_idx]->ltd_active))
1004                 rc = 0;
1005
1006         if (rc == 0) {
1007                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1008                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1009                                 req->rq_oi.oi_oa->o_ctime;
1010                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1011                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1012                                 req->rq_oi.oi_oa->o_mtime;
1013                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1014                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1015                                 req->rq_oi.oi_oa->o_atime;
1016         }
1017
1018         RETURN(rc);
1019 }
1020
1021 /* The callback for osc_setattr_async that finilizes a request info when a
1022  * response is received. */
1023 static int cb_setattr_update(void *cookie, int rc)
1024 {
1025         struct obd_info *oinfo = cookie;
1026         struct lov_request *lovreq;
1027         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1028         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1029 }
1030
1031 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1032                          struct obd_trans_info *oti,
1033                          struct lov_request_set **reqset)
1034 {
1035         struct lov_request_set *set;
1036         struct lov_obd *lov = &exp->exp_obd->u.lov;
1037         int rc = 0, i;
1038
1039         OBD_ALLOC(set, sizeof(*set));
1040         if (set == NULL)
1041                 RETURN(-ENOMEM);
1042         lov_init_set(set);
1043
1044         set->set_exp = exp;
1045         set->set_oti = oti;
1046         set->set_oi = oinfo;
1047         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1048                 set->set_cookies = oti->oti_logcookies;
1049
1050         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1051                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1052                 struct lov_request *req;
1053
1054                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1055                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1056                         continue;
1057                 }
1058
1059                 OBD_ALLOC(req, sizeof(*req));
1060                 if (req == NULL)
1061                         GOTO(out_set, rc = -ENOMEM);
1062                 req->rq_stripe = i;
1063                 req->rq_idx = loi->loi_ost_idx;
1064
1065                 OBDO_ALLOC(req->rq_oi.oi_oa);
1066                 if (req->rq_oi.oi_oa == NULL) {
1067                         OBD_FREE(req, sizeof(*req));
1068                         GOTO(out_set, rc = -ENOMEM);
1069                 }
1070                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1071                        sizeof(*req->rq_oi.oi_oa));
1072                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1073                 req->rq_oi.oi_oa->o_stripe_idx = i;
1074                 req->rq_oi.oi_cb_up = cb_setattr_update;
1075                 req->rq_oi.oi_capa = oinfo->oi_capa;
1076
1077                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1078                         int off = lov_stripe_offset(oinfo->oi_md,
1079                                                     oinfo->oi_oa->o_size, i,
1080                                                     &req->rq_oi.oi_oa->o_size);
1081
1082                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1083                                 req->rq_oi.oi_oa->o_size--;
1084
1085                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1086                                i, req->rq_oi.oi_oa->o_size,
1087                                oinfo->oi_oa->o_size);
1088                 }
1089                 lov_set_add_req(req, set);
1090         }
1091         if (!set->set_count)
1092                 GOTO(out_set, rc = -EIO);
1093         *reqset = set;
1094         RETURN(rc);
1095 out_set:
1096         lov_fini_setattr_set(set);
1097         RETURN(rc);
1098 }
1099
1100 int lov_fini_punch_set(struct lov_request_set *set)
1101 {
1102         int rc = 0;
1103
1104         if (set == NULL)
1105                 RETURN(0);
1106         LASSERT(set->set_exp);
1107         if (atomic_read(&set->set_completes)) {
1108                 rc = -EIO;
1109                 /* FIXME update qos data here */
1110                 if (atomic_read(&set->set_success))
1111                         rc = common_attr_done(set);
1112         }
1113
1114         lov_put_reqset(set);
1115
1116         RETURN(rc);
1117 }
1118
1119 int lov_update_punch_set(struct lov_request_set *set,
1120                          struct lov_request *req, int rc)
1121 {
1122         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1123         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1124
1125         lov_update_set(set, req, rc);
1126
1127         /* grace error on inactive ost */
1128         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1129                 rc = 0;
1130
1131         if (rc == 0) {
1132                 lov_stripe_lock(lsm);
1133                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1134                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1135                                 req->rq_oi.oi_oa->o_blocks;
1136                 }
1137
1138                 lov_stripe_unlock(lsm);
1139         }
1140
1141         RETURN(rc);
1142 }
1143
1144 /* The callback for osc_punch that finilizes a request info when a response
1145  * is received. */
1146 static int cb_update_punch(void *cookie, int rc)
1147 {
1148         struct obd_info *oinfo = cookie;
1149         struct lov_request *lovreq;
1150         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1151         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1152 }
1153
1154 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1155                        struct obd_trans_info *oti,
1156                        struct lov_request_set **reqset)
1157 {
1158         struct lov_request_set *set;
1159         struct lov_obd *lov = &exp->exp_obd->u.lov;
1160         int rc = 0, i;
1161
1162         OBD_ALLOC(set, sizeof(*set));
1163         if (set == NULL)
1164                 RETURN(-ENOMEM);
1165         lov_init_set(set);
1166
1167         set->set_oi = oinfo;
1168         set->set_exp = exp;
1169
1170         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1171                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1172                 struct lov_request *req;
1173                 obd_off rs, re;
1174
1175                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1176                                            oinfo->oi_policy.l_extent.start,
1177                                            oinfo->oi_policy.l_extent.end,
1178                                            &rs, &re))
1179                         continue;
1180
1181                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1182                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1183                         GOTO(out_set, rc = -EIO);
1184                 }
1185
1186                 OBD_ALLOC(req, sizeof(*req));
1187                 if (req == NULL)
1188                         GOTO(out_set, rc = -ENOMEM);
1189                 req->rq_stripe = i;
1190                 req->rq_idx = loi->loi_ost_idx;
1191
1192                 OBDO_ALLOC(req->rq_oi.oi_oa);
1193                 if (req->rq_oi.oi_oa == NULL) {
1194                         OBD_FREE(req, sizeof(*req));
1195                         GOTO(out_set, rc = -ENOMEM);
1196                 }
1197                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1198                        sizeof(*req->rq_oi.oi_oa));
1199                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1200                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1201
1202                 req->rq_oi.oi_oa->o_stripe_idx = i;
1203                 req->rq_oi.oi_cb_up = cb_update_punch;
1204
1205                 req->rq_oi.oi_policy.l_extent.start = rs;
1206                 req->rq_oi.oi_policy.l_extent.end = re;
1207                 req->rq_oi.oi_policy.l_extent.gid = -1;
1208
1209                 req->rq_oi.oi_capa = oinfo->oi_capa;
1210
1211                 lov_set_add_req(req, set);
1212         }
1213         if (!set->set_count)
1214                 GOTO(out_set, rc = -EIO);
1215         *reqset = set;
1216         RETURN(rc);
1217 out_set:
1218         lov_fini_punch_set(set);
1219         RETURN(rc);
1220 }
1221
1222 int lov_fini_sync_set(struct lov_request_set *set)
1223 {
1224         int rc = 0;
1225
1226         if (set == NULL)
1227                 RETURN(0);
1228         LASSERT(set->set_exp);
1229         if (atomic_read(&set->set_completes)) {
1230                 if (!atomic_read(&set->set_success))
1231                         rc = -EIO;
1232                 /* FIXME update qos data here */
1233         }
1234
1235         lov_put_reqset(set);
1236
1237         RETURN(rc);
1238 }
1239
1240 /* The callback for osc_sync that finilizes a request info when a
1241  * response is recieved. */
1242 static int cb_sync_update(void *cookie, int rc)
1243 {
1244         struct obd_info *oinfo = cookie;
1245         struct lov_request *lovreq;
1246
1247         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1248         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1249 }
1250
1251 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1252                       obd_off start, obd_off end,
1253                       struct lov_request_set **reqset)
1254 {
1255         struct lov_request_set *set;
1256         struct lov_obd *lov = &exp->exp_obd->u.lov;
1257         int rc = 0, i;
1258
1259         OBD_ALLOC_PTR(set);
1260         if (set == NULL)
1261                 RETURN(-ENOMEM);
1262         lov_init_set(set);
1263
1264         set->set_exp = exp;
1265         set->set_oi = oinfo;
1266
1267         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1268                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1269                 struct lov_request *req;
1270                 obd_off rs, re;
1271
1272                 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
1273                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1274                         continue;
1275                 }
1276
1277                 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1278                                            &re))
1279                         continue;
1280
1281                 OBD_ALLOC_PTR(req);
1282                 if (req == NULL)
1283                         GOTO(out_set, rc = -ENOMEM);
1284                 req->rq_stripe = i;
1285                 req->rq_idx = loi->loi_ost_idx;
1286
1287                 OBDO_ALLOC(req->rq_oi.oi_oa);
1288                 if (req->rq_oi.oi_oa == NULL) {
1289                         OBD_FREE(req, sizeof(*req));
1290                         GOTO(out_set, rc = -ENOMEM);
1291                 }
1292                 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1293                 req->rq_oi.oi_oa->o_oi = loi->loi_oi;
1294                 req->rq_oi.oi_oa->o_stripe_idx = i;
1295
1296                 req->rq_oi.oi_policy.l_extent.start = rs;
1297                 req->rq_oi.oi_policy.l_extent.end = re;
1298                 req->rq_oi.oi_policy.l_extent.gid = -1;
1299                 req->rq_oi.oi_cb_up = cb_sync_update;
1300
1301                 lov_set_add_req(req, set);
1302         }
1303         if (!set->set_count)
1304                 GOTO(out_set, rc = -EIO);
1305         *reqset = set;
1306         RETURN(rc);
1307 out_set:
1308         lov_fini_sync_set(set);
1309         RETURN(rc);
1310 }
1311
1312 #define LOV_U64_MAX ((__u64)~0ULL)
1313 #define LOV_SUM_MAX(tot, add)                                      \
1314         do {                                                        \
1315                 if ((tot) + (add) < (tot))                            \
1316                         (tot) = LOV_U64_MAX;                        \
1317                 else                                                \
1318                         (tot) += (add);                          \
1319         } while(0)
1320
1321 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1322 {
1323         if (success) {
1324                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
1325                                                            LOV_MAGIC, 0);
1326                 if (osfs->os_files != LOV_U64_MAX)
1327                         lov_do_div64(osfs->os_files, expected_stripes);
1328                 if (osfs->os_ffree != LOV_U64_MAX)
1329                         lov_do_div64(osfs->os_ffree, expected_stripes);
1330
1331                 spin_lock(&obd->obd_osfs_lock);
1332                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1333                 obd->obd_osfs_age = cfs_time_current_64();
1334                 spin_unlock(&obd->obd_osfs_lock);
1335                 RETURN(0);
1336         }
1337
1338         RETURN(-EIO);
1339 }
1340
1341 int lov_fini_statfs_set(struct lov_request_set *set)
1342 {
1343         int rc = 0;
1344
1345         if (set == NULL)
1346                 RETURN(0);
1347
1348         if (atomic_read(&set->set_completes)) {
1349                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1350                                      atomic_read(&set->set_success));
1351         }
1352         lov_put_reqset(set);
1353         RETURN(rc);
1354 }
1355
1356 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1357                        int success)
1358 {
1359         int shift = 0, quit = 0;
1360         __u64 tmp;
1361
1362         if (success == 0) {
1363                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1364         } else {
1365                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1366                         /* assume all block sizes are always powers of 2 */
1367                         /* get the bits difference */
1368                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1369                         for (shift = 0; shift <= 64; ++shift) {
1370                                 if (tmp & 1) {
1371                                         if (quit)
1372                                                 break;
1373                                         else
1374                                                 quit = 1;
1375                                         shift = 0;
1376                                 }
1377                                 tmp >>= 1;
1378                         }
1379                 }
1380
1381                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1382                         osfs->os_bsize = lov_sfs->os_bsize;
1383
1384                         osfs->os_bfree  >>= shift;
1385                         osfs->os_bavail >>= shift;
1386                         osfs->os_blocks >>= shift;
1387                 } else if (shift != 0) {
1388                         lov_sfs->os_bfree  >>= shift;
1389                         lov_sfs->os_bavail >>= shift;
1390                         lov_sfs->os_blocks >>= shift;
1391                 }
1392                 osfs->os_bfree += lov_sfs->os_bfree;
1393                 osfs->os_bavail += lov_sfs->os_bavail;
1394                 osfs->os_blocks += lov_sfs->os_blocks;
1395                 /* XXX not sure about this one - depends on policy.
1396                  *   - could be minimum if we always stripe on all OBDs
1397                  *     (but that would be wrong for any other policy,
1398                  *     if one of the OBDs has no more objects left)
1399                  *   - could be sum if we stripe whole objects
1400                  *   - could be average, just to give a nice number
1401                  *
1402                  * To give a "reasonable" (if not wholly accurate)
1403                  * number, we divide the total number of free objects
1404                  * by expected stripe count (watch out for overflow).
1405                  */
1406                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1407                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1408         }
1409 }
1410
1411 /* The callback for osc_statfs_async that finilizes a request info when a
1412  * response is received. */
1413 static int cb_statfs_update(void *cookie, int rc)
1414 {
1415         struct obd_info *oinfo = cookie;
1416         struct lov_request *lovreq;
1417         struct lov_request_set *set;
1418         struct obd_statfs *osfs, *lov_sfs;
1419         struct lov_obd *lov;
1420         struct lov_tgt_desc *tgt;
1421         struct obd_device *lovobd, *tgtobd;
1422         int success;
1423
1424         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1425         set = lovreq->rq_rqset;
1426         lovobd = set->set_obd;
1427         lov = &lovobd->u.lov;
1428         osfs = set->set_oi->oi_osfs;
1429         lov_sfs = oinfo->oi_osfs;
1430         success = atomic_read(&set->set_success);
1431         /* XXX: the same is done in lov_update_common_set, however
1432            lovset->set_exp is not initialized. */
1433         lov_update_set(set, lovreq, rc);
1434         if (rc)
1435                 GOTO(out, rc);
1436
1437         obd_getref(lovobd);
1438         tgt = lov->lov_tgts[lovreq->rq_idx];
1439         if (!tgt || !tgt->ltd_active)
1440                 GOTO(out_update, rc);
1441
1442         tgtobd = class_exp2obd(tgt->ltd_exp);
1443         spin_lock(&tgtobd->obd_osfs_lock);
1444         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1445         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1446                 tgtobd->obd_osfs_age = cfs_time_current_64();
1447         spin_unlock(&tgtobd->obd_osfs_lock);
1448
1449 out_update:
1450         lov_update_statfs(osfs, lov_sfs, success);
1451         obd_putref(lovobd);
1452
1453 out:
1454         if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1455             lov_set_finished(set, 0)) {
1456                 lov_statfs_interpret(NULL, set, set->set_count !=
1457                                      atomic_read(&set->set_success));
1458         }
1459
1460         RETURN(0);
1461 }
1462
1463 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1464                         struct lov_request_set **reqset)
1465 {
1466         struct lov_request_set *set;
1467         struct lov_obd *lov = &obd->u.lov;
1468         int rc = 0, i;
1469
1470         OBD_ALLOC(set, sizeof(*set));
1471         if (set == NULL)
1472                 RETURN(-ENOMEM);
1473         lov_init_set(set);
1474
1475         set->set_obd = obd;
1476         set->set_oi = oinfo;
1477
1478         /* We only get block data from the OBD */
1479         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1480                 struct lov_request *req;
1481
1482                 if (lov->lov_tgts[i] == NULL ||
1483                     (!lov_check_and_wait_active(lov, i) &&
1484                      (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1485                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1486                         continue;
1487                 }
1488
1489                 /* skip targets that have been explicitely disabled by the
1490                  * administrator */
1491                 if (!lov->lov_tgts[i]->ltd_exp) {
1492                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1493                         continue;
1494                 }
1495
1496                 OBD_ALLOC(req, sizeof(*req));
1497                 if (req == NULL)
1498                         GOTO(out_set, rc = -ENOMEM);
1499
1500                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1501                 if (req->rq_oi.oi_osfs == NULL) {
1502                         OBD_FREE(req, sizeof(*req));
1503                         GOTO(out_set, rc = -ENOMEM);
1504                 }
1505
1506                 req->rq_idx = i;
1507                 req->rq_oi.oi_cb_up = cb_statfs_update;
1508                 req->rq_oi.oi_flags = oinfo->oi_flags;
1509
1510                 lov_set_add_req(req, set);
1511         }
1512         if (!set->set_count)
1513                 GOTO(out_set, rc = -EIO);
1514         *reqset = set;
1515         RETURN(rc);
1516 out_set:
1517         lov_fini_statfs_set(set);
1518         RETURN(rc);
1519 }