ceph: close old con before reopening on mds reconnect
[firefly-linux-kernel-4.4.55.git] / fs / ceph / mds_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/fs.h>
4 #include <linux/wait.h>
5 #include <linux/slab.h>
6 #include <linux/sched.h>
7 #include <linux/debugfs.h>
8 #include <linux/seq_file.h>
9
10 #include "super.h"
11 #include "mds_client.h"
12
13 #include <linux/ceph/ceph_features.h>
14 #include <linux/ceph/messenger.h>
15 #include <linux/ceph/decode.h>
16 #include <linux/ceph/pagelist.h>
17 #include <linux/ceph/auth.h>
18 #include <linux/ceph/debugfs.h>
19
20 /*
21  * A cluster of MDS (metadata server) daemons is responsible for
22  * managing the file system namespace (the directory hierarchy and
23  * inodes) and for coordinating shared access to storage.  Metadata is
24  * partitioning hierarchically across a number of servers, and that
25  * partition varies over time as the cluster adjusts the distribution
26  * in order to balance load.
27  *
28  * The MDS client is primarily responsible to managing synchronous
29  * metadata requests for operations like open, unlink, and so forth.
30  * If there is a MDS failure, we find out about it when we (possibly
31  * request and) receive a new MDS map, and can resubmit affected
32  * requests.
33  *
34  * For the most part, though, we take advantage of a lossless
35  * communications channel to the MDS, and do not need to worry about
36  * timing out or resubmitting requests.
37  *
38  * We maintain a stateful "session" with each MDS we interact with.
39  * Within each session, we sent periodic heartbeat messages to ensure
40  * any capabilities or leases we have been issues remain valid.  If
41  * the session times out and goes stale, our leases and capabilities
42  * are no longer valid.
43  */
44
45 struct ceph_reconnect_state {
46         struct ceph_pagelist *pagelist;
47         bool flock;
48 };
49
50 static void __wake_requests(struct ceph_mds_client *mdsc,
51                             struct list_head *head);
52
53 static const struct ceph_connection_operations mds_con_ops;
54
55
56 /*
57  * mds reply parsing
58  */
59
60 /*
61  * parse individual inode info
62  */
63 static int parse_reply_info_in(void **p, void *end,
64                                struct ceph_mds_reply_info_in *info,
65                                int features)
66 {
67         int err = -EIO;
68
69         info->in = *p;
70         *p += sizeof(struct ceph_mds_reply_inode) +
71                 sizeof(*info->in->fragtree.splits) *
72                 le32_to_cpu(info->in->fragtree.nsplits);
73
74         ceph_decode_32_safe(p, end, info->symlink_len, bad);
75         ceph_decode_need(p, end, info->symlink_len, bad);
76         info->symlink = *p;
77         *p += info->symlink_len;
78
79         if (features & CEPH_FEATURE_DIRLAYOUTHASH)
80                 ceph_decode_copy_safe(p, end, &info->dir_layout,
81                                       sizeof(info->dir_layout), bad);
82         else
83                 memset(&info->dir_layout, 0, sizeof(info->dir_layout));
84
85         ceph_decode_32_safe(p, end, info->xattr_len, bad);
86         ceph_decode_need(p, end, info->xattr_len, bad);
87         info->xattr_data = *p;
88         *p += info->xattr_len;
89         return 0;
90 bad:
91         return err;
92 }
93
94 /*
95  * parse a normal reply, which may contain a (dir+)dentry and/or a
96  * target inode.
97  */
98 static int parse_reply_info_trace(void **p, void *end,
99                                   struct ceph_mds_reply_info_parsed *info,
100                                   int features)
101 {
102         int err;
103
104         if (info->head->is_dentry) {
105                 err = parse_reply_info_in(p, end, &info->diri, features);
106                 if (err < 0)
107                         goto out_bad;
108
109                 if (unlikely(*p + sizeof(*info->dirfrag) > end))
110                         goto bad;
111                 info->dirfrag = *p;
112                 *p += sizeof(*info->dirfrag) +
113                         sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
114                 if (unlikely(*p > end))
115                         goto bad;
116
117                 ceph_decode_32_safe(p, end, info->dname_len, bad);
118                 ceph_decode_need(p, end, info->dname_len, bad);
119                 info->dname = *p;
120                 *p += info->dname_len;
121                 info->dlease = *p;
122                 *p += sizeof(*info->dlease);
123         }
124
125         if (info->head->is_target) {
126                 err = parse_reply_info_in(p, end, &info->targeti, features);
127                 if (err < 0)
128                         goto out_bad;
129         }
130
131         if (unlikely(*p != end))
132                 goto bad;
133         return 0;
134
135 bad:
136         err = -EIO;
137 out_bad:
138         pr_err("problem parsing mds trace %d\n", err);
139         return err;
140 }
141
142 /*
143  * parse readdir results
144  */
145 static int parse_reply_info_dir(void **p, void *end,
146                                 struct ceph_mds_reply_info_parsed *info,
147                                 int features)
148 {
149         u32 num, i = 0;
150         int err;
151
152         info->dir_dir = *p;
153         if (*p + sizeof(*info->dir_dir) > end)
154                 goto bad;
155         *p += sizeof(*info->dir_dir) +
156                 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
157         if (*p > end)
158                 goto bad;
159
160         ceph_decode_need(p, end, sizeof(num) + 2, bad);
161         num = ceph_decode_32(p);
162         info->dir_end = ceph_decode_8(p);
163         info->dir_complete = ceph_decode_8(p);
164         if (num == 0)
165                 goto done;
166
167         /* alloc large array */
168         info->dir_nr = num;
169         info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
170                                sizeof(*info->dir_dname) +
171                                sizeof(*info->dir_dname_len) +
172                                sizeof(*info->dir_dlease),
173                                GFP_NOFS);
174         if (info->dir_in == NULL) {
175                 err = -ENOMEM;
176                 goto out_bad;
177         }
178         info->dir_dname = (void *)(info->dir_in + num);
179         info->dir_dname_len = (void *)(info->dir_dname + num);
180         info->dir_dlease = (void *)(info->dir_dname_len + num);
181
182         while (num) {
183                 /* dentry */
184                 ceph_decode_need(p, end, sizeof(u32)*2, bad);
185                 info->dir_dname_len[i] = ceph_decode_32(p);
186                 ceph_decode_need(p, end, info->dir_dname_len[i], bad);
187                 info->dir_dname[i] = *p;
188                 *p += info->dir_dname_len[i];
189                 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
190                      info->dir_dname[i]);
191                 info->dir_dlease[i] = *p;
192                 *p += sizeof(struct ceph_mds_reply_lease);
193
194                 /* inode */
195                 err = parse_reply_info_in(p, end, &info->dir_in[i], features);
196                 if (err < 0)
197                         goto out_bad;
198                 i++;
199                 num--;
200         }
201
202 done:
203         if (*p != end)
204                 goto bad;
205         return 0;
206
207 bad:
208         err = -EIO;
209 out_bad:
210         pr_err("problem parsing dir contents %d\n", err);
211         return err;
212 }
213
214 /*
215  * parse fcntl F_GETLK results
216  */
217 static int parse_reply_info_filelock(void **p, void *end,
218                                      struct ceph_mds_reply_info_parsed *info,
219                                      int features)
220 {
221         if (*p + sizeof(*info->filelock_reply) > end)
222                 goto bad;
223
224         info->filelock_reply = *p;
225         *p += sizeof(*info->filelock_reply);
226
227         if (unlikely(*p != end))
228                 goto bad;
229         return 0;
230
231 bad:
232         return -EIO;
233 }
234
235 /*
236  * parse extra results
237  */
238 static int parse_reply_info_extra(void **p, void *end,
239                                   struct ceph_mds_reply_info_parsed *info,
240                                   int features)
241 {
242         if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
243                 return parse_reply_info_filelock(p, end, info, features);
244         else
245                 return parse_reply_info_dir(p, end, info, features);
246 }
247
248 /*
249  * parse entire mds reply
250  */
251 static int parse_reply_info(struct ceph_msg *msg,
252                             struct ceph_mds_reply_info_parsed *info,
253                             int features)
254 {
255         void *p, *end;
256         u32 len;
257         int err;
258
259         info->head = msg->front.iov_base;
260         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
261         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
262
263         /* trace */
264         ceph_decode_32_safe(&p, end, len, bad);
265         if (len > 0) {
266                 ceph_decode_need(&p, end, len, bad);
267                 err = parse_reply_info_trace(&p, p+len, info, features);
268                 if (err < 0)
269                         goto out_bad;
270         }
271
272         /* extra */
273         ceph_decode_32_safe(&p, end, len, bad);
274         if (len > 0) {
275                 ceph_decode_need(&p, end, len, bad);
276                 err = parse_reply_info_extra(&p, p+len, info, features);
277                 if (err < 0)
278                         goto out_bad;
279         }
280
281         /* snap blob */
282         ceph_decode_32_safe(&p, end, len, bad);
283         info->snapblob_len = len;
284         info->snapblob = p;
285         p += len;
286
287         if (p != end)
288                 goto bad;
289         return 0;
290
291 bad:
292         err = -EIO;
293 out_bad:
294         pr_err("mds parse_reply err %d\n", err);
295         return err;
296 }
297
298 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
299 {
300         kfree(info->dir_in);
301 }
302
303
304 /*
305  * sessions
306  */
307 static const char *session_state_name(int s)
308 {
309         switch (s) {
310         case CEPH_MDS_SESSION_NEW: return "new";
311         case CEPH_MDS_SESSION_OPENING: return "opening";
312         case CEPH_MDS_SESSION_OPEN: return "open";
313         case CEPH_MDS_SESSION_HUNG: return "hung";
314         case CEPH_MDS_SESSION_CLOSING: return "closing";
315         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
316         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
317         default: return "???";
318         }
319 }
320
321 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
322 {
323         if (atomic_inc_not_zero(&s->s_ref)) {
324                 dout("mdsc get_session %p %d -> %d\n", s,
325                      atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
326                 return s;
327         } else {
328                 dout("mdsc get_session %p 0 -- FAIL", s);
329                 return NULL;
330         }
331 }
332
333 void ceph_put_mds_session(struct ceph_mds_session *s)
334 {
335         dout("mdsc put_session %p %d -> %d\n", s,
336              atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
337         if (atomic_dec_and_test(&s->s_ref)) {
338                 if (s->s_auth.authorizer)
339                      s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
340                              s->s_mdsc->fsc->client->monc.auth,
341                              s->s_auth.authorizer);
342                 kfree(s);
343         }
344 }
345
346 /*
347  * called under mdsc->mutex
348  */
349 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
350                                                    int mds)
351 {
352         struct ceph_mds_session *session;
353
354         if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
355                 return NULL;
356         session = mdsc->sessions[mds];
357         dout("lookup_mds_session %p %d\n", session,
358              atomic_read(&session->s_ref));
359         get_session(session);
360         return session;
361 }
362
363 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
364 {
365         if (mds >= mdsc->max_sessions)
366                 return false;
367         return mdsc->sessions[mds];
368 }
369
370 static int __verify_registered_session(struct ceph_mds_client *mdsc,
371                                        struct ceph_mds_session *s)
372 {
373         if (s->s_mds >= mdsc->max_sessions ||
374             mdsc->sessions[s->s_mds] != s)
375                 return -ENOENT;
376         return 0;
377 }
378
379 /*
380  * create+register a new session for given mds.
381  * called under mdsc->mutex.
382  */
383 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
384                                                  int mds)
385 {
386         struct ceph_mds_session *s;
387
388         s = kzalloc(sizeof(*s), GFP_NOFS);
389         if (!s)
390                 return ERR_PTR(-ENOMEM);
391         s->s_mdsc = mdsc;
392         s->s_mds = mds;
393         s->s_state = CEPH_MDS_SESSION_NEW;
394         s->s_ttl = 0;
395         s->s_seq = 0;
396         mutex_init(&s->s_mutex);
397
398         ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
399
400         spin_lock_init(&s->s_gen_ttl_lock);
401         s->s_cap_gen = 0;
402         s->s_cap_ttl = jiffies - 1;
403
404         spin_lock_init(&s->s_cap_lock);
405         s->s_renew_requested = 0;
406         s->s_renew_seq = 0;
407         INIT_LIST_HEAD(&s->s_caps);
408         s->s_nr_caps = 0;
409         s->s_trim_caps = 0;
410         atomic_set(&s->s_ref, 1);
411         INIT_LIST_HEAD(&s->s_waiting);
412         INIT_LIST_HEAD(&s->s_unsafe);
413         s->s_num_cap_releases = 0;
414         s->s_cap_iterator = NULL;
415         INIT_LIST_HEAD(&s->s_cap_releases);
416         INIT_LIST_HEAD(&s->s_cap_releases_done);
417         INIT_LIST_HEAD(&s->s_cap_flushing);
418         INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
419
420         dout("register_session mds%d\n", mds);
421         if (mds >= mdsc->max_sessions) {
422                 int newmax = 1 << get_count_order(mds+1);
423                 struct ceph_mds_session **sa;
424
425                 dout("register_session realloc to %d\n", newmax);
426                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
427                 if (sa == NULL)
428                         goto fail_realloc;
429                 if (mdsc->sessions) {
430                         memcpy(sa, mdsc->sessions,
431                                mdsc->max_sessions * sizeof(void *));
432                         kfree(mdsc->sessions);
433                 }
434                 mdsc->sessions = sa;
435                 mdsc->max_sessions = newmax;
436         }
437         mdsc->sessions[mds] = s;
438         atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
439
440         ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
441                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
442
443         return s;
444
445 fail_realloc:
446         kfree(s);
447         return ERR_PTR(-ENOMEM);
448 }
449
450 /*
451  * called under mdsc->mutex
452  */
453 static void __unregister_session(struct ceph_mds_client *mdsc,
454                                struct ceph_mds_session *s)
455 {
456         dout("__unregister_session mds%d %p\n", s->s_mds, s);
457         BUG_ON(mdsc->sessions[s->s_mds] != s);
458         mdsc->sessions[s->s_mds] = NULL;
459         ceph_con_close(&s->s_con);
460         ceph_put_mds_session(s);
461 }
462
463 /*
464  * drop session refs in request.
465  *
466  * should be last request ref, or hold mdsc->mutex
467  */
468 static void put_request_session(struct ceph_mds_request *req)
469 {
470         if (req->r_session) {
471                 ceph_put_mds_session(req->r_session);
472                 req->r_session = NULL;
473         }
474 }
475
476 void ceph_mdsc_release_request(struct kref *kref)
477 {
478         struct ceph_mds_request *req = container_of(kref,
479                                                     struct ceph_mds_request,
480                                                     r_kref);
481         if (req->r_request)
482                 ceph_msg_put(req->r_request);
483         if (req->r_reply) {
484                 ceph_msg_put(req->r_reply);
485                 destroy_reply_info(&req->r_reply_info);
486         }
487         if (req->r_inode) {
488                 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
489                 iput(req->r_inode);
490         }
491         if (req->r_locked_dir)
492                 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
493         if (req->r_target_inode)
494                 iput(req->r_target_inode);
495         if (req->r_dentry)
496                 dput(req->r_dentry);
497         if (req->r_old_dentry) {
498                 /*
499                  * track (and drop pins for) r_old_dentry_dir
500                  * separately, since r_old_dentry's d_parent may have
501                  * changed between the dir mutex being dropped and
502                  * this request being freed.
503                  */
504                 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
505                                   CEPH_CAP_PIN);
506                 dput(req->r_old_dentry);
507                 iput(req->r_old_dentry_dir);
508         }
509         kfree(req->r_path1);
510         kfree(req->r_path2);
511         put_request_session(req);
512         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
513         kfree(req);
514 }
515
516 /*
517  * lookup session, bump ref if found.
518  *
519  * called under mdsc->mutex.
520  */
521 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
522                                              u64 tid)
523 {
524         struct ceph_mds_request *req;
525         struct rb_node *n = mdsc->request_tree.rb_node;
526
527         while (n) {
528                 req = rb_entry(n, struct ceph_mds_request, r_node);
529                 if (tid < req->r_tid)
530                         n = n->rb_left;
531                 else if (tid > req->r_tid)
532                         n = n->rb_right;
533                 else {
534                         ceph_mdsc_get_request(req);
535                         return req;
536                 }
537         }
538         return NULL;
539 }
540
541 static void __insert_request(struct ceph_mds_client *mdsc,
542                              struct ceph_mds_request *new)
543 {
544         struct rb_node **p = &mdsc->request_tree.rb_node;
545         struct rb_node *parent = NULL;
546         struct ceph_mds_request *req = NULL;
547
548         while (*p) {
549                 parent = *p;
550                 req = rb_entry(parent, struct ceph_mds_request, r_node);
551                 if (new->r_tid < req->r_tid)
552                         p = &(*p)->rb_left;
553                 else if (new->r_tid > req->r_tid)
554                         p = &(*p)->rb_right;
555                 else
556                         BUG();
557         }
558
559         rb_link_node(&new->r_node, parent, p);
560         rb_insert_color(&new->r_node, &mdsc->request_tree);
561 }
562
563 /*
564  * Register an in-flight request, and assign a tid.  Link to directory
565  * are modifying (if any).
566  *
567  * Called under mdsc->mutex.
568  */
569 static void __register_request(struct ceph_mds_client *mdsc,
570                                struct ceph_mds_request *req,
571                                struct inode *dir)
572 {
573         req->r_tid = ++mdsc->last_tid;
574         if (req->r_num_caps)
575                 ceph_reserve_caps(mdsc, &req->r_caps_reservation,
576                                   req->r_num_caps);
577         dout("__register_request %p tid %lld\n", req, req->r_tid);
578         ceph_mdsc_get_request(req);
579         __insert_request(mdsc, req);
580
581         req->r_uid = current_fsuid();
582         req->r_gid = current_fsgid();
583
584         if (dir) {
585                 struct ceph_inode_info *ci = ceph_inode(dir);
586
587                 ihold(dir);
588                 spin_lock(&ci->i_unsafe_lock);
589                 req->r_unsafe_dir = dir;
590                 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
591                 spin_unlock(&ci->i_unsafe_lock);
592         }
593 }
594
595 static void __unregister_request(struct ceph_mds_client *mdsc,
596                                  struct ceph_mds_request *req)
597 {
598         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
599         rb_erase(&req->r_node, &mdsc->request_tree);
600         RB_CLEAR_NODE(&req->r_node);
601
602         if (req->r_unsafe_dir) {
603                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
604
605                 spin_lock(&ci->i_unsafe_lock);
606                 list_del_init(&req->r_unsafe_dir_item);
607                 spin_unlock(&ci->i_unsafe_lock);
608
609                 iput(req->r_unsafe_dir);
610                 req->r_unsafe_dir = NULL;
611         }
612
613         ceph_mdsc_put_request(req);
614 }
615
616 /*
617  * Choose mds to send request to next.  If there is a hint set in the
618  * request (e.g., due to a prior forward hint from the mds), use that.
619  * Otherwise, consult frag tree and/or caps to identify the
620  * appropriate mds.  If all else fails, choose randomly.
621  *
622  * Called under mdsc->mutex.
623  */
624 static struct dentry *get_nonsnap_parent(struct dentry *dentry)
625 {
626         /*
627          * we don't need to worry about protecting the d_parent access
628          * here because we never renaming inside the snapped namespace
629          * except to resplice to another snapdir, and either the old or new
630          * result is a valid result.
631          */
632         while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
633                 dentry = dentry->d_parent;
634         return dentry;
635 }
636
637 static int __choose_mds(struct ceph_mds_client *mdsc,
638                         struct ceph_mds_request *req)
639 {
640         struct inode *inode;
641         struct ceph_inode_info *ci;
642         struct ceph_cap *cap;
643         int mode = req->r_direct_mode;
644         int mds = -1;
645         u32 hash = req->r_direct_hash;
646         bool is_hash = req->r_direct_is_hash;
647
648         /*
649          * is there a specific mds we should try?  ignore hint if we have
650          * no session and the mds is not up (active or recovering).
651          */
652         if (req->r_resend_mds >= 0 &&
653             (__have_session(mdsc, req->r_resend_mds) ||
654              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
655                 dout("choose_mds using resend_mds mds%d\n",
656                      req->r_resend_mds);
657                 return req->r_resend_mds;
658         }
659
660         if (mode == USE_RANDOM_MDS)
661                 goto random;
662
663         inode = NULL;
664         if (req->r_inode) {
665                 inode = req->r_inode;
666         } else if (req->r_dentry) {
667                 /* ignore race with rename; old or new d_parent is okay */
668                 struct dentry *parent = req->r_dentry->d_parent;
669                 struct inode *dir = parent->d_inode;
670
671                 if (dir->i_sb != mdsc->fsc->sb) {
672                         /* not this fs! */
673                         inode = req->r_dentry->d_inode;
674                 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
675                         /* direct snapped/virtual snapdir requests
676                          * based on parent dir inode */
677                         struct dentry *dn = get_nonsnap_parent(parent);
678                         inode = dn->d_inode;
679                         dout("__choose_mds using nonsnap parent %p\n", inode);
680                 } else if (req->r_dentry->d_inode) {
681                         /* dentry target */
682                         inode = req->r_dentry->d_inode;
683                 } else {
684                         /* dir + name */
685                         inode = dir;
686                         hash = ceph_dentry_hash(dir, req->r_dentry);
687                         is_hash = true;
688                 }
689         }
690
691         dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
692              (int)hash, mode);
693         if (!inode)
694                 goto random;
695         ci = ceph_inode(inode);
696
697         if (is_hash && S_ISDIR(inode->i_mode)) {
698                 struct ceph_inode_frag frag;
699                 int found;
700
701                 ceph_choose_frag(ci, hash, &frag, &found);
702                 if (found) {
703                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
704                                 u8 r;
705
706                                 /* choose a random replica */
707                                 get_random_bytes(&r, 1);
708                                 r %= frag.ndist;
709                                 mds = frag.dist[r];
710                                 dout("choose_mds %p %llx.%llx "
711                                      "frag %u mds%d (%d/%d)\n",
712                                      inode, ceph_vinop(inode),
713                                      frag.frag, mds,
714                                      (int)r, frag.ndist);
715                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
716                                     CEPH_MDS_STATE_ACTIVE)
717                                         return mds;
718                         }
719
720                         /* since this file/dir wasn't known to be
721                          * replicated, then we want to look for the
722                          * authoritative mds. */
723                         mode = USE_AUTH_MDS;
724                         if (frag.mds >= 0) {
725                                 /* choose auth mds */
726                                 mds = frag.mds;
727                                 dout("choose_mds %p %llx.%llx "
728                                      "frag %u mds%d (auth)\n",
729                                      inode, ceph_vinop(inode), frag.frag, mds);
730                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
731                                     CEPH_MDS_STATE_ACTIVE)
732                                         return mds;
733                         }
734                 }
735         }
736
737         spin_lock(&ci->i_ceph_lock);
738         cap = NULL;
739         if (mode == USE_AUTH_MDS)
740                 cap = ci->i_auth_cap;
741         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
742                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
743         if (!cap) {
744                 spin_unlock(&ci->i_ceph_lock);
745                 goto random;
746         }
747         mds = cap->session->s_mds;
748         dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
749              inode, ceph_vinop(inode), mds,
750              cap == ci->i_auth_cap ? "auth " : "", cap);
751         spin_unlock(&ci->i_ceph_lock);
752         return mds;
753
754 random:
755         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
756         dout("choose_mds chose random mds%d\n", mds);
757         return mds;
758 }
759
760
761 /*
762  * session messages
763  */
764 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
765 {
766         struct ceph_msg *msg;
767         struct ceph_mds_session_head *h;
768
769         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
770                            false);
771         if (!msg) {
772                 pr_err("create_session_msg ENOMEM creating msg\n");
773                 return NULL;
774         }
775         h = msg->front.iov_base;
776         h->op = cpu_to_le32(op);
777         h->seq = cpu_to_le64(seq);
778         return msg;
779 }
780
781 /*
782  * send session open request.
783  *
784  * called under mdsc->mutex
785  */
786 static int __open_session(struct ceph_mds_client *mdsc,
787                           struct ceph_mds_session *session)
788 {
789         struct ceph_msg *msg;
790         int mstate;
791         int mds = session->s_mds;
792
793         /* wait for mds to go active? */
794         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
795         dout("open_session to mds%d (%s)\n", mds,
796              ceph_mds_state_name(mstate));
797         session->s_state = CEPH_MDS_SESSION_OPENING;
798         session->s_renew_requested = jiffies;
799
800         /* send connect message */
801         msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
802         if (!msg)
803                 return -ENOMEM;
804         ceph_con_send(&session->s_con, msg);
805         return 0;
806 }
807
808 /*
809  * open sessions for any export targets for the given mds
810  *
811  * called under mdsc->mutex
812  */
813 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
814                                           struct ceph_mds_session *session)
815 {
816         struct ceph_mds_info *mi;
817         struct ceph_mds_session *ts;
818         int i, mds = session->s_mds;
819         int target;
820
821         if (mds >= mdsc->mdsmap->m_max_mds)
822                 return;
823         mi = &mdsc->mdsmap->m_info[mds];
824         dout("open_export_target_sessions for mds%d (%d targets)\n",
825              session->s_mds, mi->num_export_targets);
826
827         for (i = 0; i < mi->num_export_targets; i++) {
828                 target = mi->export_targets[i];
829                 ts = __ceph_lookup_mds_session(mdsc, target);
830                 if (!ts) {
831                         ts = register_session(mdsc, target);
832                         if (IS_ERR(ts))
833                                 return;
834                 }
835                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
836                     session->s_state == CEPH_MDS_SESSION_CLOSING)
837                         __open_session(mdsc, session);
838                 else
839                         dout(" mds%d target mds%d %p is %s\n", session->s_mds,
840                              i, ts, session_state_name(ts->s_state));
841                 ceph_put_mds_session(ts);
842         }
843 }
844
845 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
846                                            struct ceph_mds_session *session)
847 {
848         mutex_lock(&mdsc->mutex);
849         __open_export_target_sessions(mdsc, session);
850         mutex_unlock(&mdsc->mutex);
851 }
852
853 /*
854  * session caps
855  */
856
857 /*
858  * Free preallocated cap messages assigned to this session
859  */
860 static void cleanup_cap_releases(struct ceph_mds_session *session)
861 {
862         struct ceph_msg *msg;
863
864         spin_lock(&session->s_cap_lock);
865         while (!list_empty(&session->s_cap_releases)) {
866                 msg = list_first_entry(&session->s_cap_releases,
867                                        struct ceph_msg, list_head);
868                 list_del_init(&msg->list_head);
869                 ceph_msg_put(msg);
870         }
871         while (!list_empty(&session->s_cap_releases_done)) {
872                 msg = list_first_entry(&session->s_cap_releases_done,
873                                        struct ceph_msg, list_head);
874                 list_del_init(&msg->list_head);
875                 ceph_msg_put(msg);
876         }
877         spin_unlock(&session->s_cap_lock);
878 }
879
880 /*
881  * Helper to safely iterate over all caps associated with a session, with
882  * special care taken to handle a racing __ceph_remove_cap().
883  *
884  * Caller must hold session s_mutex.
885  */
886 static int iterate_session_caps(struct ceph_mds_session *session,
887                                  int (*cb)(struct inode *, struct ceph_cap *,
888                                             void *), void *arg)
889 {
890         struct list_head *p;
891         struct ceph_cap *cap;
892         struct inode *inode, *last_inode = NULL;
893         struct ceph_cap *old_cap = NULL;
894         int ret;
895
896         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
897         spin_lock(&session->s_cap_lock);
898         p = session->s_caps.next;
899         while (p != &session->s_caps) {
900                 cap = list_entry(p, struct ceph_cap, session_caps);
901                 inode = igrab(&cap->ci->vfs_inode);
902                 if (!inode) {
903                         p = p->next;
904                         continue;
905                 }
906                 session->s_cap_iterator = cap;
907                 spin_unlock(&session->s_cap_lock);
908
909                 if (last_inode) {
910                         iput(last_inode);
911                         last_inode = NULL;
912                 }
913                 if (old_cap) {
914                         ceph_put_cap(session->s_mdsc, old_cap);
915                         old_cap = NULL;
916                 }
917
918                 ret = cb(inode, cap, arg);
919                 last_inode = inode;
920
921                 spin_lock(&session->s_cap_lock);
922                 p = p->next;
923                 if (cap->ci == NULL) {
924                         dout("iterate_session_caps  finishing cap %p removal\n",
925                              cap);
926                         BUG_ON(cap->session != session);
927                         list_del_init(&cap->session_caps);
928                         session->s_nr_caps--;
929                         cap->session = NULL;
930                         old_cap = cap;  /* put_cap it w/o locks held */
931                 }
932                 if (ret < 0)
933                         goto out;
934         }
935         ret = 0;
936 out:
937         session->s_cap_iterator = NULL;
938         spin_unlock(&session->s_cap_lock);
939
940         if (last_inode)
941                 iput(last_inode);
942         if (old_cap)
943                 ceph_put_cap(session->s_mdsc, old_cap);
944
945         return ret;
946 }
947
948 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
949                                   void *arg)
950 {
951         struct ceph_inode_info *ci = ceph_inode(inode);
952         int drop = 0;
953
954         dout("removing cap %p, ci is %p, inode is %p\n",
955              cap, ci, &ci->vfs_inode);
956         spin_lock(&ci->i_ceph_lock);
957         __ceph_remove_cap(cap);
958         if (!__ceph_is_any_real_caps(ci)) {
959                 struct ceph_mds_client *mdsc =
960                         ceph_sb_to_client(inode->i_sb)->mdsc;
961
962                 spin_lock(&mdsc->cap_dirty_lock);
963                 if (!list_empty(&ci->i_dirty_item)) {
964                         pr_info(" dropping dirty %s state for %p %lld\n",
965                                 ceph_cap_string(ci->i_dirty_caps),
966                                 inode, ceph_ino(inode));
967                         ci->i_dirty_caps = 0;
968                         list_del_init(&ci->i_dirty_item);
969                         drop = 1;
970                 }
971                 if (!list_empty(&ci->i_flushing_item)) {
972                         pr_info(" dropping dirty+flushing %s state for %p %lld\n",
973                                 ceph_cap_string(ci->i_flushing_caps),
974                                 inode, ceph_ino(inode));
975                         ci->i_flushing_caps = 0;
976                         list_del_init(&ci->i_flushing_item);
977                         mdsc->num_cap_flushing--;
978                         drop = 1;
979                 }
980                 if (drop && ci->i_wrbuffer_ref) {
981                         pr_info(" dropping dirty data for %p %lld\n",
982                                 inode, ceph_ino(inode));
983                         ci->i_wrbuffer_ref = 0;
984                         ci->i_wrbuffer_ref_head = 0;
985                         drop++;
986                 }
987                 spin_unlock(&mdsc->cap_dirty_lock);
988         }
989         spin_unlock(&ci->i_ceph_lock);
990         while (drop--)
991                 iput(inode);
992         return 0;
993 }
994
995 /*
996  * caller must hold session s_mutex
997  */
998 static void remove_session_caps(struct ceph_mds_session *session)
999 {
1000         dout("remove_session_caps on %p\n", session);
1001         iterate_session_caps(session, remove_session_caps_cb, NULL);
1002         BUG_ON(session->s_nr_caps > 0);
1003         BUG_ON(!list_empty(&session->s_cap_flushing));
1004         cleanup_cap_releases(session);
1005 }
1006
1007 /*
1008  * wake up any threads waiting on this session's caps.  if the cap is
1009  * old (didn't get renewed on the client reconnect), remove it now.
1010  *
1011  * caller must hold s_mutex.
1012  */
1013 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1014                               void *arg)
1015 {
1016         struct ceph_inode_info *ci = ceph_inode(inode);
1017
1018         wake_up_all(&ci->i_cap_wq);
1019         if (arg) {
1020                 spin_lock(&ci->i_ceph_lock);
1021                 ci->i_wanted_max_size = 0;
1022                 ci->i_requested_max_size = 0;
1023                 spin_unlock(&ci->i_ceph_lock);
1024         }
1025         return 0;
1026 }
1027
1028 static void wake_up_session_caps(struct ceph_mds_session *session,
1029                                  int reconnect)
1030 {
1031         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1032         iterate_session_caps(session, wake_up_session_cb,
1033                              (void *)(unsigned long)reconnect);
1034 }
1035
1036 /*
1037  * Send periodic message to MDS renewing all currently held caps.  The
1038  * ack will reset the expiration for all caps from this session.
1039  *
1040  * caller holds s_mutex
1041  */
1042 static int send_renew_caps(struct ceph_mds_client *mdsc,
1043                            struct ceph_mds_session *session)
1044 {
1045         struct ceph_msg *msg;
1046         int state;
1047
1048         if (time_after_eq(jiffies, session->s_cap_ttl) &&
1049             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1050                 pr_info("mds%d caps stale\n", session->s_mds);
1051         session->s_renew_requested = jiffies;
1052
1053         /* do not try to renew caps until a recovering mds has reconnected
1054          * with its clients. */
1055         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1056         if (state < CEPH_MDS_STATE_RECONNECT) {
1057                 dout("send_renew_caps ignoring mds%d (%s)\n",
1058                      session->s_mds, ceph_mds_state_name(state));
1059                 return 0;
1060         }
1061
1062         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1063                 ceph_mds_state_name(state));
1064         msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1065                                  ++session->s_renew_seq);
1066         if (!msg)
1067                 return -ENOMEM;
1068         ceph_con_send(&session->s_con, msg);
1069         return 0;
1070 }
1071
1072 /*
1073  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1074  *
1075  * Called under session->s_mutex
1076  */
1077 static void renewed_caps(struct ceph_mds_client *mdsc,
1078                          struct ceph_mds_session *session, int is_renew)
1079 {
1080         int was_stale;
1081         int wake = 0;
1082
1083         spin_lock(&session->s_cap_lock);
1084         was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1085
1086         session->s_cap_ttl = session->s_renew_requested +
1087                 mdsc->mdsmap->m_session_timeout*HZ;
1088
1089         if (was_stale) {
1090                 if (time_before(jiffies, session->s_cap_ttl)) {
1091                         pr_info("mds%d caps renewed\n", session->s_mds);
1092                         wake = 1;
1093                 } else {
1094                         pr_info("mds%d caps still stale\n", session->s_mds);
1095                 }
1096         }
1097         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1098              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1099              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1100         spin_unlock(&session->s_cap_lock);
1101
1102         if (wake)
1103                 wake_up_session_caps(session, 0);
1104 }
1105
1106 /*
1107  * send a session close request
1108  */
1109 static int request_close_session(struct ceph_mds_client *mdsc,
1110                                  struct ceph_mds_session *session)
1111 {
1112         struct ceph_msg *msg;
1113
1114         dout("request_close_session mds%d state %s seq %lld\n",
1115              session->s_mds, session_state_name(session->s_state),
1116              session->s_seq);
1117         msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1118         if (!msg)
1119                 return -ENOMEM;
1120         ceph_con_send(&session->s_con, msg);
1121         return 0;
1122 }
1123
1124 /*
1125  * Called with s_mutex held.
1126  */
1127 static int __close_session(struct ceph_mds_client *mdsc,
1128                          struct ceph_mds_session *session)
1129 {
1130         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1131                 return 0;
1132         session->s_state = CEPH_MDS_SESSION_CLOSING;
1133         return request_close_session(mdsc, session);
1134 }
1135
1136 /*
1137  * Trim old(er) caps.
1138  *
1139  * Because we can't cache an inode without one or more caps, we do
1140  * this indirectly: if a cap is unused, we prune its aliases, at which
1141  * point the inode will hopefully get dropped to.
1142  *
1143  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1144  * memory pressure from the MDS, though, so it needn't be perfect.
1145  */
1146 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1147 {
1148         struct ceph_mds_session *session = arg;
1149         struct ceph_inode_info *ci = ceph_inode(inode);
1150         int used, oissued, mine;
1151
1152         if (session->s_trim_caps <= 0)
1153                 return -1;
1154
1155         spin_lock(&ci->i_ceph_lock);
1156         mine = cap->issued | cap->implemented;
1157         used = __ceph_caps_used(ci);
1158         oissued = __ceph_caps_issued_other(ci, cap);
1159
1160         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
1161              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1162              ceph_cap_string(used));
1163         if (ci->i_dirty_caps)
1164                 goto out;   /* dirty caps */
1165         if ((used & ~oissued) & mine)
1166                 goto out;   /* we need these caps */
1167
1168         session->s_trim_caps--;
1169         if (oissued) {
1170                 /* we aren't the only cap.. just remove us */
1171                 __ceph_remove_cap(cap);
1172         } else {
1173                 /* try to drop referring dentries */
1174                 spin_unlock(&ci->i_ceph_lock);
1175                 d_prune_aliases(inode);
1176                 dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1177                      inode, cap, atomic_read(&inode->i_count));
1178                 return 0;
1179         }
1180
1181 out:
1182         spin_unlock(&ci->i_ceph_lock);
1183         return 0;
1184 }
1185
1186 /*
1187  * Trim session cap count down to some max number.
1188  */
1189 static int trim_caps(struct ceph_mds_client *mdsc,
1190                      struct ceph_mds_session *session,
1191                      int max_caps)
1192 {
1193         int trim_caps = session->s_nr_caps - max_caps;
1194
1195         dout("trim_caps mds%d start: %d / %d, trim %d\n",
1196              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1197         if (trim_caps > 0) {
1198                 session->s_trim_caps = trim_caps;
1199                 iterate_session_caps(session, trim_caps_cb, session);
1200                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1201                      session->s_mds, session->s_nr_caps, max_caps,
1202                         trim_caps - session->s_trim_caps);
1203                 session->s_trim_caps = 0;
1204         }
1205         return 0;
1206 }
1207
1208 /*
1209  * Allocate cap_release messages.  If there is a partially full message
1210  * in the queue, try to allocate enough to cover it's remainder, so that
1211  * we can send it immediately.
1212  *
1213  * Called under s_mutex.
1214  */
1215 int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1216                           struct ceph_mds_session *session)
1217 {
1218         struct ceph_msg *msg, *partial = NULL;
1219         struct ceph_mds_cap_release *head;
1220         int err = -ENOMEM;
1221         int extra = mdsc->fsc->mount_options->cap_release_safety;
1222         int num;
1223
1224         dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1225              extra);
1226
1227         spin_lock(&session->s_cap_lock);
1228
1229         if (!list_empty(&session->s_cap_releases)) {
1230                 msg = list_first_entry(&session->s_cap_releases,
1231                                        struct ceph_msg,
1232                                  list_head);
1233                 head = msg->front.iov_base;
1234                 num = le32_to_cpu(head->num);
1235                 if (num) {
1236                         dout(" partial %p with (%d/%d)\n", msg, num,
1237                              (int)CEPH_CAPS_PER_RELEASE);
1238                         extra += CEPH_CAPS_PER_RELEASE - num;
1239                         partial = msg;
1240                 }
1241         }
1242         while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1243                 spin_unlock(&session->s_cap_lock);
1244                 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1245                                    GFP_NOFS, false);
1246                 if (!msg)
1247                         goto out_unlocked;
1248                 dout("add_cap_releases %p msg %p now %d\n", session, msg,
1249                      (int)msg->front.iov_len);
1250                 head = msg->front.iov_base;
1251                 head->num = cpu_to_le32(0);
1252                 msg->front.iov_len = sizeof(*head);
1253                 spin_lock(&session->s_cap_lock);
1254                 list_add(&msg->list_head, &session->s_cap_releases);
1255                 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1256         }
1257
1258         if (partial) {
1259                 head = partial->front.iov_base;
1260                 num = le32_to_cpu(head->num);
1261                 dout(" queueing partial %p with %d/%d\n", partial, num,
1262                      (int)CEPH_CAPS_PER_RELEASE);
1263                 list_move_tail(&partial->list_head,
1264                                &session->s_cap_releases_done);
1265                 session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
1266         }
1267         err = 0;
1268         spin_unlock(&session->s_cap_lock);
1269 out_unlocked:
1270         return err;
1271 }
1272
1273 /*
1274  * flush all dirty inode data to disk.
1275  *
1276  * returns true if we've flushed through want_flush_seq
1277  */
1278 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1279 {
1280         int mds, ret = 1;
1281
1282         dout("check_cap_flush want %lld\n", want_flush_seq);
1283         mutex_lock(&mdsc->mutex);
1284         for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1285                 struct ceph_mds_session *session = mdsc->sessions[mds];
1286
1287                 if (!session)
1288                         continue;
1289                 get_session(session);
1290                 mutex_unlock(&mdsc->mutex);
1291
1292                 mutex_lock(&session->s_mutex);
1293                 if (!list_empty(&session->s_cap_flushing)) {
1294                         struct ceph_inode_info *ci =
1295                                 list_entry(session->s_cap_flushing.next,
1296                                            struct ceph_inode_info,
1297                                            i_flushing_item);
1298                         struct inode *inode = &ci->vfs_inode;
1299
1300                         spin_lock(&ci->i_ceph_lock);
1301                         if (ci->i_cap_flush_seq <= want_flush_seq) {
1302                                 dout("check_cap_flush still flushing %p "
1303                                      "seq %lld <= %lld to mds%d\n", inode,
1304                                      ci->i_cap_flush_seq, want_flush_seq,
1305                                      session->s_mds);
1306                                 ret = 0;
1307                         }
1308                         spin_unlock(&ci->i_ceph_lock);
1309                 }
1310                 mutex_unlock(&session->s_mutex);
1311                 ceph_put_mds_session(session);
1312
1313                 if (!ret)
1314                         return ret;
1315                 mutex_lock(&mdsc->mutex);
1316         }
1317
1318         mutex_unlock(&mdsc->mutex);
1319         dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1320         return ret;
1321 }
1322
1323 /*
1324  * called under s_mutex
1325  */
1326 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1327                             struct ceph_mds_session *session)
1328 {
1329         struct ceph_msg *msg;
1330
1331         dout("send_cap_releases mds%d\n", session->s_mds);
1332         spin_lock(&session->s_cap_lock);
1333         while (!list_empty(&session->s_cap_releases_done)) {
1334                 msg = list_first_entry(&session->s_cap_releases_done,
1335                                  struct ceph_msg, list_head);
1336                 list_del_init(&msg->list_head);
1337                 spin_unlock(&session->s_cap_lock);
1338                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1339                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1340                 ceph_con_send(&session->s_con, msg);
1341                 spin_lock(&session->s_cap_lock);
1342         }
1343         spin_unlock(&session->s_cap_lock);
1344 }
1345
1346 static void discard_cap_releases(struct ceph_mds_client *mdsc,
1347                                  struct ceph_mds_session *session)
1348 {
1349         struct ceph_msg *msg;
1350         struct ceph_mds_cap_release *head;
1351         unsigned num;
1352
1353         dout("discard_cap_releases mds%d\n", session->s_mds);
1354         spin_lock(&session->s_cap_lock);
1355
1356         /* zero out the in-progress message */
1357         msg = list_first_entry(&session->s_cap_releases,
1358                                struct ceph_msg, list_head);
1359         head = msg->front.iov_base;
1360         num = le32_to_cpu(head->num);
1361         dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1362         head->num = cpu_to_le32(0);
1363         session->s_num_cap_releases += num;
1364
1365         /* requeue completed messages */
1366         while (!list_empty(&session->s_cap_releases_done)) {
1367                 msg = list_first_entry(&session->s_cap_releases_done,
1368                                  struct ceph_msg, list_head);
1369                 list_del_init(&msg->list_head);
1370
1371                 head = msg->front.iov_base;
1372                 num = le32_to_cpu(head->num);
1373                 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1374                      num);
1375                 session->s_num_cap_releases += num;
1376                 head->num = cpu_to_le32(0);
1377                 msg->front.iov_len = sizeof(*head);
1378                 list_add(&msg->list_head, &session->s_cap_releases);
1379         }
1380
1381         spin_unlock(&session->s_cap_lock);
1382 }
1383
1384 /*
1385  * requests
1386  */
1387
1388 /*
1389  * Create an mds request.
1390  */
1391 struct ceph_mds_request *
1392 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1393 {
1394         struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1395
1396         if (!req)
1397                 return ERR_PTR(-ENOMEM);
1398
1399         mutex_init(&req->r_fill_mutex);
1400         req->r_mdsc = mdsc;
1401         req->r_started = jiffies;
1402         req->r_resend_mds = -1;
1403         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1404         req->r_fmode = -1;
1405         kref_init(&req->r_kref);
1406         INIT_LIST_HEAD(&req->r_wait);
1407         init_completion(&req->r_completion);
1408         init_completion(&req->r_safe_completion);
1409         INIT_LIST_HEAD(&req->r_unsafe_item);
1410
1411         req->r_op = op;
1412         req->r_direct_mode = mode;
1413         return req;
1414 }
1415
1416 /*
1417  * return oldest (lowest) request, tid in request tree, 0 if none.
1418  *
1419  * called under mdsc->mutex.
1420  */
1421 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1422 {
1423         if (RB_EMPTY_ROOT(&mdsc->request_tree))
1424                 return NULL;
1425         return rb_entry(rb_first(&mdsc->request_tree),
1426                         struct ceph_mds_request, r_node);
1427 }
1428
1429 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1430 {
1431         struct ceph_mds_request *req = __get_oldest_req(mdsc);
1432
1433         if (req)
1434                 return req->r_tid;
1435         return 0;
1436 }
1437
1438 /*
1439  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1440  * on build_path_from_dentry in fs/cifs/dir.c.
1441  *
1442  * If @stop_on_nosnap, generate path relative to the first non-snapped
1443  * inode.
1444  *
1445  * Encode hidden .snap dirs as a double /, i.e.
1446  *   foo/.snap/bar -> foo//bar
1447  */
1448 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1449                            int stop_on_nosnap)
1450 {
1451         struct dentry *temp;
1452         char *path;
1453         int len, pos;
1454         unsigned seq;
1455
1456         if (dentry == NULL)
1457                 return ERR_PTR(-EINVAL);
1458
1459 retry:
1460         len = 0;
1461         seq = read_seqbegin(&rename_lock);
1462         rcu_read_lock();
1463         for (temp = dentry; !IS_ROOT(temp);) {
1464                 struct inode *inode = temp->d_inode;
1465                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1466                         len++;  /* slash only */
1467                 else if (stop_on_nosnap && inode &&
1468                          ceph_snap(inode) == CEPH_NOSNAP)
1469                         break;
1470                 else
1471                         len += 1 + temp->d_name.len;
1472                 temp = temp->d_parent;
1473         }
1474         rcu_read_unlock();
1475         if (len)
1476                 len--;  /* no leading '/' */
1477
1478         path = kmalloc(len+1, GFP_NOFS);
1479         if (path == NULL)
1480                 return ERR_PTR(-ENOMEM);
1481         pos = len;
1482         path[pos] = 0;  /* trailing null */
1483         rcu_read_lock();
1484         for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1485                 struct inode *inode;
1486
1487                 spin_lock(&temp->d_lock);
1488                 inode = temp->d_inode;
1489                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1490                         dout("build_path path+%d: %p SNAPDIR\n",
1491                              pos, temp);
1492                 } else if (stop_on_nosnap && inode &&
1493                            ceph_snap(inode) == CEPH_NOSNAP) {
1494                         spin_unlock(&temp->d_lock);
1495                         break;
1496                 } else {
1497                         pos -= temp->d_name.len;
1498                         if (pos < 0) {
1499                                 spin_unlock(&temp->d_lock);
1500                                 break;
1501                         }
1502                         strncpy(path + pos, temp->d_name.name,
1503                                 temp->d_name.len);
1504                 }
1505                 spin_unlock(&temp->d_lock);
1506                 if (pos)
1507                         path[--pos] = '/';
1508                 temp = temp->d_parent;
1509         }
1510         rcu_read_unlock();
1511         if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1512                 pr_err("build_path did not end path lookup where "
1513                        "expected, namelen is %d, pos is %d\n", len, pos);
1514                 /* presumably this is only possible if racing with a
1515                    rename of one of the parent directories (we can not
1516                    lock the dentries above us to prevent this, but
1517                    retrying should be harmless) */
1518                 kfree(path);
1519                 goto retry;
1520         }
1521
1522         *base = ceph_ino(temp->d_inode);
1523         *plen = len;
1524         dout("build_path on %p %d built %llx '%.*s'\n",
1525              dentry, dentry->d_count, *base, len, path);
1526         return path;
1527 }
1528
1529 static int build_dentry_path(struct dentry *dentry,
1530                              const char **ppath, int *ppathlen, u64 *pino,
1531                              int *pfreepath)
1532 {
1533         char *path;
1534
1535         if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1536                 *pino = ceph_ino(dentry->d_parent->d_inode);
1537                 *ppath = dentry->d_name.name;
1538                 *ppathlen = dentry->d_name.len;
1539                 return 0;
1540         }
1541         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1542         if (IS_ERR(path))
1543                 return PTR_ERR(path);
1544         *ppath = path;
1545         *pfreepath = 1;
1546         return 0;
1547 }
1548
1549 static int build_inode_path(struct inode *inode,
1550                             const char **ppath, int *ppathlen, u64 *pino,
1551                             int *pfreepath)
1552 {
1553         struct dentry *dentry;
1554         char *path;
1555
1556         if (ceph_snap(inode) == CEPH_NOSNAP) {
1557                 *pino = ceph_ino(inode);
1558                 *ppathlen = 0;
1559                 return 0;
1560         }
1561         dentry = d_find_alias(inode);
1562         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1563         dput(dentry);
1564         if (IS_ERR(path))
1565                 return PTR_ERR(path);
1566         *ppath = path;
1567         *pfreepath = 1;
1568         return 0;
1569 }
1570
1571 /*
1572  * request arguments may be specified via an inode *, a dentry *, or
1573  * an explicit ino+path.
1574  */
1575 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1576                                   const char *rpath, u64 rino,
1577                                   const char **ppath, int *pathlen,
1578                                   u64 *ino, int *freepath)
1579 {
1580         int r = 0;
1581
1582         if (rinode) {
1583                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1584                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1585                      ceph_snap(rinode));
1586         } else if (rdentry) {
1587                 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1588                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1589                      *ppath);
1590         } else if (rpath || rino) {
1591                 *ino = rino;
1592                 *ppath = rpath;
1593                 *pathlen = strlen(rpath);
1594                 dout(" path %.*s\n", *pathlen, rpath);
1595         }
1596
1597         return r;
1598 }
1599
1600 /*
1601  * called under mdsc->mutex
1602  */
1603 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1604                                                struct ceph_mds_request *req,
1605                                                int mds)
1606 {
1607         struct ceph_msg *msg;
1608         struct ceph_mds_request_head *head;
1609         const char *path1 = NULL;
1610         const char *path2 = NULL;
1611         u64 ino1 = 0, ino2 = 0;
1612         int pathlen1 = 0, pathlen2 = 0;
1613         int freepath1 = 0, freepath2 = 0;
1614         int len;
1615         u16 releases;
1616         void *p, *end;
1617         int ret;
1618
1619         ret = set_request_path_attr(req->r_inode, req->r_dentry,
1620                               req->r_path1, req->r_ino1.ino,
1621                               &path1, &pathlen1, &ino1, &freepath1);
1622         if (ret < 0) {
1623                 msg = ERR_PTR(ret);
1624                 goto out;
1625         }
1626
1627         ret = set_request_path_attr(NULL, req->r_old_dentry,
1628                               req->r_path2, req->r_ino2.ino,
1629                               &path2, &pathlen2, &ino2, &freepath2);
1630         if (ret < 0) {
1631                 msg = ERR_PTR(ret);
1632                 goto out_free1;
1633         }
1634
1635         len = sizeof(*head) +
1636                 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
1637
1638         /* calculate (max) length for cap releases */
1639         len += sizeof(struct ceph_mds_request_release) *
1640                 (!!req->r_inode_drop + !!req->r_dentry_drop +
1641                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1642         if (req->r_dentry_drop)
1643                 len += req->r_dentry->d_name.len;
1644         if (req->r_old_dentry_drop)
1645                 len += req->r_old_dentry->d_name.len;
1646
1647         msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1648         if (!msg) {
1649                 msg = ERR_PTR(-ENOMEM);
1650                 goto out_free2;
1651         }
1652
1653         msg->hdr.tid = cpu_to_le64(req->r_tid);
1654
1655         head = msg->front.iov_base;
1656         p = msg->front.iov_base + sizeof(*head);
1657         end = msg->front.iov_base + msg->front.iov_len;
1658
1659         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1660         head->op = cpu_to_le32(req->r_op);
1661         head->caller_uid = cpu_to_le32(req->r_uid);
1662         head->caller_gid = cpu_to_le32(req->r_gid);
1663         head->args = req->r_args;
1664
1665         ceph_encode_filepath(&p, end, ino1, path1);
1666         ceph_encode_filepath(&p, end, ino2, path2);
1667
1668         /* make note of release offset, in case we need to replay */
1669         req->r_request_release_offset = p - msg->front.iov_base;
1670
1671         /* cap releases */
1672         releases = 0;
1673         if (req->r_inode_drop)
1674                 releases += ceph_encode_inode_release(&p,
1675                       req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1676                       mds, req->r_inode_drop, req->r_inode_unless, 0);
1677         if (req->r_dentry_drop)
1678                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1679                        mds, req->r_dentry_drop, req->r_dentry_unless);
1680         if (req->r_old_dentry_drop)
1681                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1682                        mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1683         if (req->r_old_inode_drop)
1684                 releases += ceph_encode_inode_release(&p,
1685                       req->r_old_dentry->d_inode,
1686                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1687         head->num_releases = cpu_to_le16(releases);
1688
1689         BUG_ON(p > end);
1690         msg->front.iov_len = p - msg->front.iov_base;
1691         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1692
1693         msg->pages = req->r_pages;
1694         msg->nr_pages = req->r_num_pages;
1695         msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1696         msg->hdr.data_off = cpu_to_le16(0);
1697
1698 out_free2:
1699         if (freepath2)
1700                 kfree((char *)path2);
1701 out_free1:
1702         if (freepath1)
1703                 kfree((char *)path1);
1704 out:
1705         return msg;
1706 }
1707
1708 /*
1709  * called under mdsc->mutex if error, under no mutex if
1710  * success.
1711  */
1712 static void complete_request(struct ceph_mds_client *mdsc,
1713                              struct ceph_mds_request *req)
1714 {
1715         if (req->r_callback)
1716                 req->r_callback(mdsc, req);
1717         else
1718                 complete_all(&req->r_completion);
1719 }
1720
1721 /*
1722  * called under mdsc->mutex
1723  */
1724 static int __prepare_send_request(struct ceph_mds_client *mdsc,
1725                                   struct ceph_mds_request *req,
1726                                   int mds)
1727 {
1728         struct ceph_mds_request_head *rhead;
1729         struct ceph_msg *msg;
1730         int flags = 0;
1731
1732         req->r_attempts++;
1733         if (req->r_inode) {
1734                 struct ceph_cap *cap =
1735                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1736
1737                 if (cap)
1738                         req->r_sent_on_mseq = cap->mseq;
1739                 else
1740                         req->r_sent_on_mseq = -1;
1741         }
1742         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1743              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1744
1745         if (req->r_got_unsafe) {
1746                 /*
1747                  * Replay.  Do not regenerate message (and rebuild
1748                  * paths, etc.); just use the original message.
1749                  * Rebuilding paths will break for renames because
1750                  * d_move mangles the src name.
1751                  */
1752                 msg = req->r_request;
1753                 rhead = msg->front.iov_base;
1754
1755                 flags = le32_to_cpu(rhead->flags);
1756                 flags |= CEPH_MDS_FLAG_REPLAY;
1757                 rhead->flags = cpu_to_le32(flags);
1758
1759                 if (req->r_target_inode)
1760                         rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1761
1762                 rhead->num_retry = req->r_attempts - 1;
1763
1764                 /* remove cap/dentry releases from message */
1765                 rhead->num_releases = 0;
1766                 msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
1767                 msg->front.iov_len = req->r_request_release_offset;
1768                 return 0;
1769         }
1770
1771         if (req->r_request) {
1772                 ceph_msg_put(req->r_request);
1773                 req->r_request = NULL;
1774         }
1775         msg = create_request_message(mdsc, req, mds);
1776         if (IS_ERR(msg)) {
1777                 req->r_err = PTR_ERR(msg);
1778                 complete_request(mdsc, req);
1779                 return PTR_ERR(msg);
1780         }
1781         req->r_request = msg;
1782
1783         rhead = msg->front.iov_base;
1784         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1785         if (req->r_got_unsafe)
1786                 flags |= CEPH_MDS_FLAG_REPLAY;
1787         if (req->r_locked_dir)
1788                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1789         rhead->flags = cpu_to_le32(flags);
1790         rhead->num_fwd = req->r_num_fwd;
1791         rhead->num_retry = req->r_attempts - 1;
1792         rhead->ino = 0;
1793
1794         dout(" r_locked_dir = %p\n", req->r_locked_dir);
1795         return 0;
1796 }
1797
1798 /*
1799  * send request, or put it on the appropriate wait list.
1800  */
1801 static int __do_request(struct ceph_mds_client *mdsc,
1802                         struct ceph_mds_request *req)
1803 {
1804         struct ceph_mds_session *session = NULL;
1805         int mds = -1;
1806         int err = -EAGAIN;
1807
1808         if (req->r_err || req->r_got_result)
1809                 goto out;
1810
1811         if (req->r_timeout &&
1812             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1813                 dout("do_request timed out\n");
1814                 err = -EIO;
1815                 goto finish;
1816         }
1817
1818         put_request_session(req);
1819
1820         mds = __choose_mds(mdsc, req);
1821         if (mds < 0 ||
1822             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1823                 dout("do_request no mds or not active, waiting for map\n");
1824                 list_add(&req->r_wait, &mdsc->waiting_for_map);
1825                 goto out;
1826         }
1827
1828         /* get, open session */
1829         session = __ceph_lookup_mds_session(mdsc, mds);
1830         if (!session) {
1831                 session = register_session(mdsc, mds);
1832                 if (IS_ERR(session)) {
1833                         err = PTR_ERR(session);
1834                         goto finish;
1835                 }
1836         }
1837         req->r_session = get_session(session);
1838
1839         dout("do_request mds%d session %p state %s\n", mds, session,
1840              session_state_name(session->s_state));
1841         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1842             session->s_state != CEPH_MDS_SESSION_HUNG) {
1843                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1844                     session->s_state == CEPH_MDS_SESSION_CLOSING)
1845                         __open_session(mdsc, session);
1846                 list_add(&req->r_wait, &session->s_waiting);
1847                 goto out_session;
1848         }
1849
1850         /* send request */
1851         req->r_resend_mds = -1;   /* forget any previous mds hint */
1852
1853         if (req->r_request_started == 0)   /* note request start time */
1854                 req->r_request_started = jiffies;
1855
1856         err = __prepare_send_request(mdsc, req, mds);
1857         if (!err) {
1858                 ceph_msg_get(req->r_request);
1859                 ceph_con_send(&session->s_con, req->r_request);
1860         }
1861
1862 out_session:
1863         ceph_put_mds_session(session);
1864 out:
1865         return err;
1866
1867 finish:
1868         req->r_err = err;
1869         complete_request(mdsc, req);
1870         goto out;
1871 }
1872
1873 /*
1874  * called under mdsc->mutex
1875  */
1876 static void __wake_requests(struct ceph_mds_client *mdsc,
1877                             struct list_head *head)
1878 {
1879         struct ceph_mds_request *req, *nreq;
1880
1881         list_for_each_entry_safe(req, nreq, head, r_wait) {
1882                 list_del_init(&req->r_wait);
1883                 __do_request(mdsc, req);
1884         }
1885 }
1886
1887 /*
1888  * Wake up threads with requests pending for @mds, so that they can
1889  * resubmit their requests to a possibly different mds.
1890  */
1891 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1892 {
1893         struct ceph_mds_request *req;
1894         struct rb_node *p;
1895
1896         dout("kick_requests mds%d\n", mds);
1897         for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
1898                 req = rb_entry(p, struct ceph_mds_request, r_node);
1899                 if (req->r_got_unsafe)
1900                         continue;
1901                 if (req->r_session &&
1902                     req->r_session->s_mds == mds) {
1903                         dout(" kicking tid %llu\n", req->r_tid);
1904                         __do_request(mdsc, req);
1905                 }
1906         }
1907 }
1908
1909 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1910                               struct ceph_mds_request *req)
1911 {
1912         dout("submit_request on %p\n", req);
1913         mutex_lock(&mdsc->mutex);
1914         __register_request(mdsc, req, NULL);
1915         __do_request(mdsc, req);
1916         mutex_unlock(&mdsc->mutex);
1917 }
1918
1919 /*
1920  * Synchrously perform an mds request.  Take care of all of the
1921  * session setup, forwarding, retry details.
1922  */
1923 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1924                          struct inode *dir,
1925                          struct ceph_mds_request *req)
1926 {
1927         int err;
1928
1929         dout("do_request on %p\n", req);
1930
1931         /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1932         if (req->r_inode)
1933                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1934         if (req->r_locked_dir)
1935                 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1936         if (req->r_old_dentry)
1937                 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
1938                                   CEPH_CAP_PIN);
1939
1940         /* issue */
1941         mutex_lock(&mdsc->mutex);
1942         __register_request(mdsc, req, dir);
1943         __do_request(mdsc, req);
1944
1945         if (req->r_err) {
1946                 err = req->r_err;
1947                 __unregister_request(mdsc, req);
1948                 dout("do_request early error %d\n", err);
1949                 goto out;
1950         }
1951
1952         /* wait */
1953         mutex_unlock(&mdsc->mutex);
1954         dout("do_request waiting\n");
1955         if (req->r_timeout) {
1956                 err = (long)wait_for_completion_killable_timeout(
1957                         &req->r_completion, req->r_timeout);
1958                 if (err == 0)
1959                         err = -EIO;
1960         } else {
1961                 err = wait_for_completion_killable(&req->r_completion);
1962         }
1963         dout("do_request waited, got %d\n", err);
1964         mutex_lock(&mdsc->mutex);
1965
1966         /* only abort if we didn't race with a real reply */
1967         if (req->r_got_result) {
1968                 err = le32_to_cpu(req->r_reply_info.head->result);
1969         } else if (err < 0) {
1970                 dout("aborted request %lld with %d\n", req->r_tid, err);
1971
1972                 /*
1973                  * ensure we aren't running concurrently with
1974                  * ceph_fill_trace or ceph_readdir_prepopulate, which
1975                  * rely on locks (dir mutex) held by our caller.
1976                  */
1977                 mutex_lock(&req->r_fill_mutex);
1978                 req->r_err = err;
1979                 req->r_aborted = true;
1980                 mutex_unlock(&req->r_fill_mutex);
1981
1982                 if (req->r_locked_dir &&
1983                     (req->r_op & CEPH_MDS_OP_WRITE))
1984                         ceph_invalidate_dir_request(req);
1985         } else {
1986                 err = req->r_err;
1987         }
1988
1989 out:
1990         mutex_unlock(&mdsc->mutex);
1991         dout("do_request %p done, result %d\n", req, err);
1992         return err;
1993 }
1994
1995 /*
1996  * Invalidate dir D_COMPLETE, dentry lease state on an aborted MDS
1997  * namespace request.
1998  */
1999 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2000 {
2001         struct inode *inode = req->r_locked_dir;
2002         struct ceph_inode_info *ci = ceph_inode(inode);
2003
2004         dout("invalidate_dir_request %p (D_COMPLETE, lease(s))\n", inode);
2005         spin_lock(&ci->i_ceph_lock);
2006         ceph_dir_clear_complete(inode);
2007         ci->i_release_count++;
2008         spin_unlock(&ci->i_ceph_lock);
2009
2010         if (req->r_dentry)
2011                 ceph_invalidate_dentry_lease(req->r_dentry);
2012         if (req->r_old_dentry)
2013                 ceph_invalidate_dentry_lease(req->r_old_dentry);
2014 }
2015
2016 /*
2017  * Handle mds reply.
2018  *
2019  * We take the session mutex and parse and process the reply immediately.
2020  * This preserves the logical ordering of replies, capabilities, etc., sent
2021  * by the MDS as they are applied to our local cache.
2022  */
2023 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2024 {
2025         struct ceph_mds_client *mdsc = session->s_mdsc;
2026         struct ceph_mds_request *req;
2027         struct ceph_mds_reply_head *head = msg->front.iov_base;
2028         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2029         u64 tid;
2030         int err, result;
2031         int mds = session->s_mds;
2032
2033         if (msg->front.iov_len < sizeof(*head)) {
2034                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2035                 ceph_msg_dump(msg);
2036                 return;
2037         }
2038
2039         /* get request, session */
2040         tid = le64_to_cpu(msg->hdr.tid);
2041         mutex_lock(&mdsc->mutex);
2042         req = __lookup_request(mdsc, tid);
2043         if (!req) {
2044                 dout("handle_reply on unknown tid %llu\n", tid);
2045                 mutex_unlock(&mdsc->mutex);
2046                 return;
2047         }
2048         dout("handle_reply %p\n", req);
2049
2050         /* correct session? */
2051         if (req->r_session != session) {
2052                 pr_err("mdsc_handle_reply got %llu on session mds%d"
2053                        " not mds%d\n", tid, session->s_mds,
2054                        req->r_session ? req->r_session->s_mds : -1);
2055                 mutex_unlock(&mdsc->mutex);
2056                 goto out;
2057         }
2058
2059         /* dup? */
2060         if ((req->r_got_unsafe && !head->safe) ||
2061             (req->r_got_safe && head->safe)) {
2062                 pr_warning("got a dup %s reply on %llu from mds%d\n",
2063                            head->safe ? "safe" : "unsafe", tid, mds);
2064                 mutex_unlock(&mdsc->mutex);
2065                 goto out;
2066         }
2067         if (req->r_got_safe && !head->safe) {
2068                 pr_warning("got unsafe after safe on %llu from mds%d\n",
2069                            tid, mds);
2070                 mutex_unlock(&mdsc->mutex);
2071                 goto out;
2072         }
2073
2074         result = le32_to_cpu(head->result);
2075
2076         /*
2077          * Handle an ESTALE
2078          * if we're not talking to the authority, send to them
2079          * if the authority has changed while we weren't looking,
2080          * send to new authority
2081          * Otherwise we just have to return an ESTALE
2082          */
2083         if (result == -ESTALE) {
2084                 dout("got ESTALE on request %llu", req->r_tid);
2085                 if (!req->r_inode) {
2086                         /* do nothing; not an authority problem */
2087                 } else if (req->r_direct_mode != USE_AUTH_MDS) {
2088                         dout("not using auth, setting for that now");
2089                         req->r_direct_mode = USE_AUTH_MDS;
2090                         __do_request(mdsc, req);
2091                         mutex_unlock(&mdsc->mutex);
2092                         goto out;
2093                 } else  {
2094                         struct ceph_inode_info *ci = ceph_inode(req->r_inode);
2095                         struct ceph_cap *cap = NULL;
2096
2097                         if (req->r_session)
2098                                 cap = ceph_get_cap_for_mds(ci,
2099                                                    req->r_session->s_mds);
2100
2101                         dout("already using auth");
2102                         if ((!cap || cap != ci->i_auth_cap) ||
2103                             (cap->mseq != req->r_sent_on_mseq)) {
2104                                 dout("but cap changed, so resending");
2105                                 __do_request(mdsc, req);
2106                                 mutex_unlock(&mdsc->mutex);
2107                                 goto out;
2108                         }
2109                 }
2110                 dout("have to return ESTALE on request %llu", req->r_tid);
2111         }
2112
2113
2114         if (head->safe) {
2115                 req->r_got_safe = true;
2116                 __unregister_request(mdsc, req);
2117                 complete_all(&req->r_safe_completion);
2118
2119                 if (req->r_got_unsafe) {
2120                         /*
2121                          * We already handled the unsafe response, now do the
2122                          * cleanup.  No need to examine the response; the MDS
2123                          * doesn't include any result info in the safe
2124                          * response.  And even if it did, there is nothing
2125                          * useful we could do with a revised return value.
2126                          */
2127                         dout("got safe reply %llu, mds%d\n", tid, mds);
2128                         list_del_init(&req->r_unsafe_item);
2129
2130                         /* last unsafe request during umount? */
2131                         if (mdsc->stopping && !__get_oldest_req(mdsc))
2132                                 complete_all(&mdsc->safe_umount_waiters);
2133                         mutex_unlock(&mdsc->mutex);
2134                         goto out;
2135                 }
2136         } else {
2137                 req->r_got_unsafe = true;
2138                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2139         }
2140
2141         dout("handle_reply tid %lld result %d\n", tid, result);
2142         rinfo = &req->r_reply_info;
2143         err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2144         mutex_unlock(&mdsc->mutex);
2145
2146         mutex_lock(&session->s_mutex);
2147         if (err < 0) {
2148                 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2149                 ceph_msg_dump(msg);
2150                 goto out_err;
2151         }
2152
2153         /* snap trace */
2154         if (rinfo->snapblob_len) {
2155                 down_write(&mdsc->snap_rwsem);
2156                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2157                                rinfo->snapblob + rinfo->snapblob_len,
2158                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
2159                 downgrade_write(&mdsc->snap_rwsem);
2160         } else {
2161                 down_read(&mdsc->snap_rwsem);
2162         }
2163
2164         /* insert trace into our cache */
2165         mutex_lock(&req->r_fill_mutex);
2166         err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2167         if (err == 0) {
2168                 if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK &&
2169                     rinfo->dir_nr)
2170                         ceph_readdir_prepopulate(req, req->r_session);
2171                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2172         }
2173         mutex_unlock(&req->r_fill_mutex);
2174
2175         up_read(&mdsc->snap_rwsem);
2176 out_err:
2177         mutex_lock(&mdsc->mutex);
2178         if (!req->r_aborted) {
2179                 if (err) {
2180                         req->r_err = err;
2181                 } else {
2182                         req->r_reply = msg;
2183                         ceph_msg_get(msg);
2184                         req->r_got_result = true;
2185                 }
2186         } else {
2187                 dout("reply arrived after request %lld was aborted\n", tid);
2188         }
2189         mutex_unlock(&mdsc->mutex);
2190
2191         ceph_add_cap_releases(mdsc, req->r_session);
2192         mutex_unlock(&session->s_mutex);
2193
2194         /* kick calling process */
2195         complete_request(mdsc, req);
2196 out:
2197         ceph_mdsc_put_request(req);
2198         return;
2199 }
2200
2201
2202
2203 /*
2204  * handle mds notification that our request has been forwarded.
2205  */
2206 static void handle_forward(struct ceph_mds_client *mdsc,
2207                            struct ceph_mds_session *session,
2208                            struct ceph_msg *msg)
2209 {
2210         struct ceph_mds_request *req;
2211         u64 tid = le64_to_cpu(msg->hdr.tid);
2212         u32 next_mds;
2213         u32 fwd_seq;
2214         int err = -EINVAL;
2215         void *p = msg->front.iov_base;
2216         void *end = p + msg->front.iov_len;
2217
2218         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2219         next_mds = ceph_decode_32(&p);
2220         fwd_seq = ceph_decode_32(&p);
2221
2222         mutex_lock(&mdsc->mutex);
2223         req = __lookup_request(mdsc, tid);
2224         if (!req) {
2225                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2226                 goto out;  /* dup reply? */
2227         }
2228
2229         if (req->r_aborted) {
2230                 dout("forward tid %llu aborted, unregistering\n", tid);
2231                 __unregister_request(mdsc, req);
2232         } else if (fwd_seq <= req->r_num_fwd) {
2233                 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2234                      tid, next_mds, req->r_num_fwd, fwd_seq);
2235         } else {
2236                 /* resend. forward race not possible; mds would drop */
2237                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2238                 BUG_ON(req->r_err);
2239                 BUG_ON(req->r_got_result);
2240                 req->r_num_fwd = fwd_seq;
2241                 req->r_resend_mds = next_mds;
2242                 put_request_session(req);
2243                 __do_request(mdsc, req);
2244         }
2245         ceph_mdsc_put_request(req);
2246 out:
2247         mutex_unlock(&mdsc->mutex);
2248         return;
2249
2250 bad:
2251         pr_err("mdsc_handle_forward decode error err=%d\n", err);
2252 }
2253
2254 /*
2255  * handle a mds session control message
2256  */
2257 static void handle_session(struct ceph_mds_session *session,
2258                            struct ceph_msg *msg)
2259 {
2260         struct ceph_mds_client *mdsc = session->s_mdsc;
2261         u32 op;
2262         u64 seq;
2263         int mds = session->s_mds;
2264         struct ceph_mds_session_head *h = msg->front.iov_base;
2265         int wake = 0;
2266
2267         /* decode */
2268         if (msg->front.iov_len != sizeof(*h))
2269                 goto bad;
2270         op = le32_to_cpu(h->op);
2271         seq = le64_to_cpu(h->seq);
2272
2273         mutex_lock(&mdsc->mutex);
2274         if (op == CEPH_SESSION_CLOSE)
2275                 __unregister_session(mdsc, session);
2276         /* FIXME: this ttl calculation is generous */
2277         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2278         mutex_unlock(&mdsc->mutex);
2279
2280         mutex_lock(&session->s_mutex);
2281
2282         dout("handle_session mds%d %s %p state %s seq %llu\n",
2283              mds, ceph_session_op_name(op), session,
2284              session_state_name(session->s_state), seq);
2285
2286         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2287                 session->s_state = CEPH_MDS_SESSION_OPEN;
2288                 pr_info("mds%d came back\n", session->s_mds);
2289         }
2290
2291         switch (op) {
2292         case CEPH_SESSION_OPEN:
2293                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2294                         pr_info("mds%d reconnect success\n", session->s_mds);
2295                 session->s_state = CEPH_MDS_SESSION_OPEN;
2296                 renewed_caps(mdsc, session, 0);
2297                 wake = 1;
2298                 if (mdsc->stopping)
2299                         __close_session(mdsc, session);
2300                 break;
2301
2302         case CEPH_SESSION_RENEWCAPS:
2303                 if (session->s_renew_seq == seq)
2304                         renewed_caps(mdsc, session, 1);
2305                 break;
2306
2307         case CEPH_SESSION_CLOSE:
2308                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2309                         pr_info("mds%d reconnect denied\n", session->s_mds);
2310                 remove_session_caps(session);
2311                 wake = 1; /* for good measure */
2312                 wake_up_all(&mdsc->session_close_wq);
2313                 kick_requests(mdsc, mds);
2314                 break;
2315
2316         case CEPH_SESSION_STALE:
2317                 pr_info("mds%d caps went stale, renewing\n",
2318                         session->s_mds);
2319                 spin_lock(&session->s_gen_ttl_lock);
2320                 session->s_cap_gen++;
2321                 session->s_cap_ttl = jiffies - 1;
2322                 spin_unlock(&session->s_gen_ttl_lock);
2323                 send_renew_caps(mdsc, session);
2324                 break;
2325
2326         case CEPH_SESSION_RECALL_STATE:
2327                 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2328                 break;
2329
2330         default:
2331                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2332                 WARN_ON(1);
2333         }
2334
2335         mutex_unlock(&session->s_mutex);
2336         if (wake) {
2337                 mutex_lock(&mdsc->mutex);
2338                 __wake_requests(mdsc, &session->s_waiting);
2339                 mutex_unlock(&mdsc->mutex);
2340         }
2341         return;
2342
2343 bad:
2344         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2345                (int)msg->front.iov_len);
2346         ceph_msg_dump(msg);
2347         return;
2348 }
2349
2350
2351 /*
2352  * called under session->mutex.
2353  */
2354 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2355                                    struct ceph_mds_session *session)
2356 {
2357         struct ceph_mds_request *req, *nreq;
2358         int err;
2359
2360         dout("replay_unsafe_requests mds%d\n", session->s_mds);
2361
2362         mutex_lock(&mdsc->mutex);
2363         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2364                 err = __prepare_send_request(mdsc, req, session->s_mds);
2365                 if (!err) {
2366                         ceph_msg_get(req->r_request);
2367                         ceph_con_send(&session->s_con, req->r_request);
2368                 }
2369         }
2370         mutex_unlock(&mdsc->mutex);
2371 }
2372
2373 /*
2374  * Encode information about a cap for a reconnect with the MDS.
2375  */
2376 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2377                           void *arg)
2378 {
2379         union {
2380                 struct ceph_mds_cap_reconnect v2;
2381                 struct ceph_mds_cap_reconnect_v1 v1;
2382         } rec;
2383         size_t reclen;
2384         struct ceph_inode_info *ci;
2385         struct ceph_reconnect_state *recon_state = arg;
2386         struct ceph_pagelist *pagelist = recon_state->pagelist;
2387         char *path;
2388         int pathlen, err;
2389         u64 pathbase;
2390         struct dentry *dentry;
2391
2392         ci = cap->ci;
2393
2394         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2395              inode, ceph_vinop(inode), cap, cap->cap_id,
2396              ceph_cap_string(cap->issued));
2397         err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2398         if (err)
2399                 return err;
2400
2401         dentry = d_find_alias(inode);
2402         if (dentry) {
2403                 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2404                 if (IS_ERR(path)) {
2405                         err = PTR_ERR(path);
2406                         goto out_dput;
2407                 }
2408         } else {
2409                 path = NULL;
2410                 pathlen = 0;
2411         }
2412         err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2413         if (err)
2414                 goto out_free;
2415
2416         spin_lock(&ci->i_ceph_lock);
2417         cap->seq = 0;        /* reset cap seq */
2418         cap->issue_seq = 0;  /* and issue_seq */
2419
2420         if (recon_state->flock) {
2421                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2422                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2423                 rec.v2.issued = cpu_to_le32(cap->issued);
2424                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2425                 rec.v2.pathbase = cpu_to_le64(pathbase);
2426                 rec.v2.flock_len = 0;
2427                 reclen = sizeof(rec.v2);
2428         } else {
2429                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2430                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2431                 rec.v1.issued = cpu_to_le32(cap->issued);
2432                 rec.v1.size = cpu_to_le64(inode->i_size);
2433                 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2434                 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2435                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2436                 rec.v1.pathbase = cpu_to_le64(pathbase);
2437                 reclen = sizeof(rec.v1);
2438         }
2439         spin_unlock(&ci->i_ceph_lock);
2440
2441         if (recon_state->flock) {
2442                 int num_fcntl_locks, num_flock_locks;
2443                 struct ceph_pagelist_cursor trunc_point;
2444
2445                 ceph_pagelist_set_cursor(pagelist, &trunc_point);
2446                 do {
2447                         lock_flocks();
2448                         ceph_count_locks(inode, &num_fcntl_locks,
2449                                          &num_flock_locks);
2450                         rec.v2.flock_len = (2*sizeof(u32) +
2451                                             (num_fcntl_locks+num_flock_locks) *
2452                                             sizeof(struct ceph_filelock));
2453                         unlock_flocks();
2454
2455                         /* pre-alloc pagelist */
2456                         ceph_pagelist_truncate(pagelist, &trunc_point);
2457                         err = ceph_pagelist_append(pagelist, &rec, reclen);
2458                         if (!err)
2459                                 err = ceph_pagelist_reserve(pagelist,
2460                                                             rec.v2.flock_len);
2461
2462                         /* encode locks */
2463                         if (!err) {
2464                                 lock_flocks();
2465                                 err = ceph_encode_locks(inode,
2466                                                         pagelist,
2467                                                         num_fcntl_locks,
2468                                                         num_flock_locks);
2469                                 unlock_flocks();
2470                         }
2471                 } while (err == -ENOSPC);
2472         } else {
2473                 err = ceph_pagelist_append(pagelist, &rec, reclen);
2474         }
2475
2476 out_free:
2477         kfree(path);
2478 out_dput:
2479         dput(dentry);
2480         return err;
2481 }
2482
2483
2484 /*
2485  * If an MDS fails and recovers, clients need to reconnect in order to
2486  * reestablish shared state.  This includes all caps issued through
2487  * this session _and_ the snap_realm hierarchy.  Because it's not
2488  * clear which snap realms the mds cares about, we send everything we
2489  * know about.. that ensures we'll then get any new info the
2490  * recovering MDS might have.
2491  *
2492  * This is a relatively heavyweight operation, but it's rare.
2493  *
2494  * called with mdsc->mutex held.
2495  */
2496 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2497                                struct ceph_mds_session *session)
2498 {
2499         struct ceph_msg *reply;
2500         struct rb_node *p;
2501         int mds = session->s_mds;
2502         int err = -ENOMEM;
2503         struct ceph_pagelist *pagelist;
2504         struct ceph_reconnect_state recon_state;
2505
2506         pr_info("mds%d reconnect start\n", mds);
2507
2508         pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2509         if (!pagelist)
2510                 goto fail_nopagelist;
2511         ceph_pagelist_init(pagelist);
2512
2513         reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
2514         if (!reply)
2515                 goto fail_nomsg;
2516
2517         mutex_lock(&session->s_mutex);
2518         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2519         session->s_seq = 0;
2520
2521         ceph_con_close(&session->s_con);
2522         ceph_con_open(&session->s_con,
2523                       CEPH_ENTITY_TYPE_MDS, mds,
2524                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2525
2526         /* replay unsafe requests */
2527         replay_unsafe_requests(mdsc, session);
2528
2529         down_read(&mdsc->snap_rwsem);
2530
2531         dout("session %p state %s\n", session,
2532              session_state_name(session->s_state));
2533
2534         /* drop old cap expires; we're about to reestablish that state */
2535         discard_cap_releases(mdsc, session);
2536
2537         /* traverse this session's caps */
2538         err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2539         if (err)
2540                 goto fail;
2541
2542         recon_state.pagelist = pagelist;
2543         recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2544         err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2545         if (err < 0)
2546                 goto fail;
2547
2548         /*
2549          * snaprealms.  we provide mds with the ino, seq (version), and
2550          * parent for all of our realms.  If the mds has any newer info,
2551          * it will tell us.
2552          */
2553         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2554                 struct ceph_snap_realm *realm =
2555                         rb_entry(p, struct ceph_snap_realm, node);
2556                 struct ceph_mds_snaprealm_reconnect sr_rec;
2557
2558                 dout(" adding snap realm %llx seq %lld parent %llx\n",
2559                      realm->ino, realm->seq, realm->parent_ino);
2560                 sr_rec.ino = cpu_to_le64(realm->ino);
2561                 sr_rec.seq = cpu_to_le64(realm->seq);
2562                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
2563                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2564                 if (err)
2565                         goto fail;
2566         }
2567
2568         reply->pagelist = pagelist;
2569         if (recon_state.flock)
2570                 reply->hdr.version = cpu_to_le16(2);
2571         reply->hdr.data_len = cpu_to_le32(pagelist->length);
2572         reply->nr_pages = calc_pages_for(0, pagelist->length);
2573         ceph_con_send(&session->s_con, reply);
2574
2575         mutex_unlock(&session->s_mutex);
2576
2577         mutex_lock(&mdsc->mutex);
2578         __wake_requests(mdsc, &session->s_waiting);
2579         mutex_unlock(&mdsc->mutex);
2580
2581         up_read(&mdsc->snap_rwsem);
2582         return;
2583
2584 fail:
2585         ceph_msg_put(reply);
2586         up_read(&mdsc->snap_rwsem);
2587         mutex_unlock(&session->s_mutex);
2588 fail_nomsg:
2589         ceph_pagelist_release(pagelist);
2590         kfree(pagelist);
2591 fail_nopagelist:
2592         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2593         return;
2594 }
2595
2596
2597 /*
2598  * compare old and new mdsmaps, kicking requests
2599  * and closing out old connections as necessary
2600  *
2601  * called under mdsc->mutex.
2602  */
2603 static void check_new_map(struct ceph_mds_client *mdsc,
2604                           struct ceph_mdsmap *newmap,
2605                           struct ceph_mdsmap *oldmap)
2606 {
2607         int i;
2608         int oldstate, newstate;
2609         struct ceph_mds_session *s;
2610
2611         dout("check_new_map new %u old %u\n",
2612              newmap->m_epoch, oldmap->m_epoch);
2613
2614         for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2615                 if (mdsc->sessions[i] == NULL)
2616                         continue;
2617                 s = mdsc->sessions[i];
2618                 oldstate = ceph_mdsmap_get_state(oldmap, i);
2619                 newstate = ceph_mdsmap_get_state(newmap, i);
2620
2621                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2622                      i, ceph_mds_state_name(oldstate),
2623                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2624                      ceph_mds_state_name(newstate),
2625                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2626                      session_state_name(s->s_state));
2627
2628                 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
2629                            ceph_mdsmap_get_addr(newmap, i),
2630                            sizeof(struct ceph_entity_addr))) {
2631                         if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2632                                 /* the session never opened, just close it
2633                                  * out now */
2634                                 __wake_requests(mdsc, &s->s_waiting);
2635                                 __unregister_session(mdsc, s);
2636                         } else {
2637                                 /* just close it */
2638                                 mutex_unlock(&mdsc->mutex);
2639                                 mutex_lock(&s->s_mutex);
2640                                 mutex_lock(&mdsc->mutex);
2641                                 ceph_con_close(&s->s_con);
2642                                 mutex_unlock(&s->s_mutex);
2643                                 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2644                         }
2645
2646                         /* kick any requests waiting on the recovering mds */
2647                         kick_requests(mdsc, i);
2648                 } else if (oldstate == newstate) {
2649                         continue;  /* nothing new with this mds */
2650                 }
2651
2652                 /*
2653                  * send reconnect?
2654                  */
2655                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2656                     newstate >= CEPH_MDS_STATE_RECONNECT) {
2657                         mutex_unlock(&mdsc->mutex);
2658                         send_mds_reconnect(mdsc, s);
2659                         mutex_lock(&mdsc->mutex);
2660                 }
2661
2662                 /*
2663                  * kick request on any mds that has gone active.
2664                  */
2665                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2666                     newstate >= CEPH_MDS_STATE_ACTIVE) {
2667                         if (oldstate != CEPH_MDS_STATE_CREATING &&
2668                             oldstate != CEPH_MDS_STATE_STARTING)
2669                                 pr_info("mds%d recovery completed\n", s->s_mds);
2670                         kick_requests(mdsc, i);
2671                         ceph_kick_flushing_caps(mdsc, s);
2672                         wake_up_session_caps(s, 1);
2673                 }
2674         }
2675
2676         for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2677                 s = mdsc->sessions[i];
2678                 if (!s)
2679                         continue;
2680                 if (!ceph_mdsmap_is_laggy(newmap, i))
2681                         continue;
2682                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2683                     s->s_state == CEPH_MDS_SESSION_HUNG ||
2684                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
2685                         dout(" connecting to export targets of laggy mds%d\n",
2686                              i);
2687                         __open_export_target_sessions(mdsc, s);
2688                 }
2689         }
2690 }
2691
2692
2693
2694 /*
2695  * leases
2696  */
2697
2698 /*
2699  * caller must hold session s_mutex, dentry->d_lock
2700  */
2701 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2702 {
2703         struct ceph_dentry_info *di = ceph_dentry(dentry);
2704
2705         ceph_put_mds_session(di->lease_session);
2706         di->lease_session = NULL;
2707 }
2708
2709 static void handle_lease(struct ceph_mds_client *mdsc,
2710                          struct ceph_mds_session *session,
2711                          struct ceph_msg *msg)
2712 {
2713         struct super_block *sb = mdsc->fsc->sb;
2714         struct inode *inode;
2715         struct dentry *parent, *dentry;
2716         struct ceph_dentry_info *di;
2717         int mds = session->s_mds;
2718         struct ceph_mds_lease *h = msg->front.iov_base;
2719         u32 seq;
2720         struct ceph_vino vino;
2721         struct qstr dname;
2722         int release = 0;
2723
2724         dout("handle_lease from mds%d\n", mds);
2725
2726         /* decode */
2727         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2728                 goto bad;
2729         vino.ino = le64_to_cpu(h->ino);
2730         vino.snap = CEPH_NOSNAP;
2731         seq = le32_to_cpu(h->seq);
2732         dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2733         dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2734         if (dname.len != get_unaligned_le32(h+1))
2735                 goto bad;
2736
2737         mutex_lock(&session->s_mutex);
2738         session->s_seq++;
2739
2740         /* lookup inode */
2741         inode = ceph_find_inode(sb, vino);
2742         dout("handle_lease %s, ino %llx %p %.*s\n",
2743              ceph_lease_op_name(h->action), vino.ino, inode,
2744              dname.len, dname.name);
2745         if (inode == NULL) {
2746                 dout("handle_lease no inode %llx\n", vino.ino);
2747                 goto release;
2748         }
2749
2750         /* dentry */
2751         parent = d_find_alias(inode);
2752         if (!parent) {
2753                 dout("no parent dentry on inode %p\n", inode);
2754                 WARN_ON(1);
2755                 goto release;  /* hrm... */
2756         }
2757         dname.hash = full_name_hash(dname.name, dname.len);
2758         dentry = d_lookup(parent, &dname);
2759         dput(parent);
2760         if (!dentry)
2761                 goto release;
2762
2763         spin_lock(&dentry->d_lock);
2764         di = ceph_dentry(dentry);
2765         switch (h->action) {
2766         case CEPH_MDS_LEASE_REVOKE:
2767                 if (di->lease_session == session) {
2768                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
2769                                 h->seq = cpu_to_le32(di->lease_seq);
2770                         __ceph_mdsc_drop_dentry_lease(dentry);
2771                 }
2772                 release = 1;
2773                 break;
2774
2775         case CEPH_MDS_LEASE_RENEW:
2776                 if (di->lease_session == session &&
2777                     di->lease_gen == session->s_cap_gen &&
2778                     di->lease_renew_from &&
2779                     di->lease_renew_after == 0) {
2780                         unsigned long duration =
2781                                 le32_to_cpu(h->duration_ms) * HZ / 1000;
2782
2783                         di->lease_seq = seq;
2784                         dentry->d_time = di->lease_renew_from + duration;
2785                         di->lease_renew_after = di->lease_renew_from +
2786                                 (duration >> 1);
2787                         di->lease_renew_from = 0;
2788                 }
2789                 break;
2790         }
2791         spin_unlock(&dentry->d_lock);
2792         dput(dentry);
2793
2794         if (!release)
2795                 goto out;
2796
2797 release:
2798         /* let's just reuse the same message */
2799         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2800         ceph_msg_get(msg);
2801         ceph_con_send(&session->s_con, msg);
2802
2803 out:
2804         iput(inode);
2805         mutex_unlock(&session->s_mutex);
2806         return;
2807
2808 bad:
2809         pr_err("corrupt lease message\n");
2810         ceph_msg_dump(msg);
2811 }
2812
2813 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2814                               struct inode *inode,
2815                               struct dentry *dentry, char action,
2816                               u32 seq)
2817 {
2818         struct ceph_msg *msg;
2819         struct ceph_mds_lease *lease;
2820         int len = sizeof(*lease) + sizeof(u32);
2821         int dnamelen = 0;
2822
2823         dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2824              inode, dentry, ceph_lease_op_name(action), session->s_mds);
2825         dnamelen = dentry->d_name.len;
2826         len += dnamelen;
2827
2828         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
2829         if (!msg)
2830                 return;
2831         lease = msg->front.iov_base;
2832         lease->action = action;
2833         lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2834         lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2835         lease->seq = cpu_to_le32(seq);
2836         put_unaligned_le32(dnamelen, lease + 1);
2837         memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2838
2839         /*
2840          * if this is a preemptive lease RELEASE, no need to
2841          * flush request stream, since the actual request will
2842          * soon follow.
2843          */
2844         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2845
2846         ceph_con_send(&session->s_con, msg);
2847 }
2848
2849 /*
2850  * Preemptively release a lease we expect to invalidate anyway.
2851  * Pass @inode always, @dentry is optional.
2852  */
2853 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2854                              struct dentry *dentry)
2855 {
2856         struct ceph_dentry_info *di;
2857         struct ceph_mds_session *session;
2858         u32 seq;
2859
2860         BUG_ON(inode == NULL);
2861         BUG_ON(dentry == NULL);
2862
2863         /* is dentry lease valid? */
2864         spin_lock(&dentry->d_lock);
2865         di = ceph_dentry(dentry);
2866         if (!di || !di->lease_session ||
2867             di->lease_session->s_mds < 0 ||
2868             di->lease_gen != di->lease_session->s_cap_gen ||
2869             !time_before(jiffies, dentry->d_time)) {
2870                 dout("lease_release inode %p dentry %p -- "
2871                      "no lease\n",
2872                      inode, dentry);
2873                 spin_unlock(&dentry->d_lock);
2874                 return;
2875         }
2876
2877         /* we do have a lease on this dentry; note mds and seq */
2878         session = ceph_get_mds_session(di->lease_session);
2879         seq = di->lease_seq;
2880         __ceph_mdsc_drop_dentry_lease(dentry);
2881         spin_unlock(&dentry->d_lock);
2882
2883         dout("lease_release inode %p dentry %p to mds%d\n",
2884              inode, dentry, session->s_mds);
2885         ceph_mdsc_lease_send_msg(session, inode, dentry,
2886                                  CEPH_MDS_LEASE_RELEASE, seq);
2887         ceph_put_mds_session(session);
2888 }
2889
2890 /*
2891  * drop all leases (and dentry refs) in preparation for umount
2892  */
2893 static void drop_leases(struct ceph_mds_client *mdsc)
2894 {
2895         int i;
2896
2897         dout("drop_leases\n");
2898         mutex_lock(&mdsc->mutex);
2899         for (i = 0; i < mdsc->max_sessions; i++) {
2900                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2901                 if (!s)
2902                         continue;
2903                 mutex_unlock(&mdsc->mutex);
2904                 mutex_lock(&s->s_mutex);
2905                 mutex_unlock(&s->s_mutex);
2906                 ceph_put_mds_session(s);
2907                 mutex_lock(&mdsc->mutex);
2908         }
2909         mutex_unlock(&mdsc->mutex);
2910 }
2911
2912
2913
2914 /*
2915  * delayed work -- periodically trim expired leases, renew caps with mds
2916  */
2917 static void schedule_delayed(struct ceph_mds_client *mdsc)
2918 {
2919         int delay = 5;
2920         unsigned hz = round_jiffies_relative(HZ * delay);
2921         schedule_delayed_work(&mdsc->delayed_work, hz);
2922 }
2923
2924 static void delayed_work(struct work_struct *work)
2925 {
2926         int i;
2927         struct ceph_mds_client *mdsc =
2928                 container_of(work, struct ceph_mds_client, delayed_work.work);
2929         int renew_interval;
2930         int renew_caps;
2931
2932         dout("mdsc delayed_work\n");
2933         ceph_check_delayed_caps(mdsc);
2934
2935         mutex_lock(&mdsc->mutex);
2936         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2937         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2938                                    mdsc->last_renew_caps);
2939         if (renew_caps)
2940                 mdsc->last_renew_caps = jiffies;
2941
2942         for (i = 0; i < mdsc->max_sessions; i++) {
2943                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2944                 if (s == NULL)
2945                         continue;
2946                 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2947                         dout("resending session close request for mds%d\n",
2948                              s->s_mds);
2949                         request_close_session(mdsc, s);
2950                         ceph_put_mds_session(s);
2951                         continue;
2952                 }
2953                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2954                         if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2955                                 s->s_state = CEPH_MDS_SESSION_HUNG;
2956                                 pr_info("mds%d hung\n", s->s_mds);
2957                         }
2958                 }
2959                 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2960                         /* this mds is failed or recovering, just wait */
2961                         ceph_put_mds_session(s);
2962                         continue;
2963                 }
2964                 mutex_unlock(&mdsc->mutex);
2965
2966                 mutex_lock(&s->s_mutex);
2967                 if (renew_caps)
2968                         send_renew_caps(mdsc, s);
2969                 else
2970                         ceph_con_keepalive(&s->s_con);
2971                 ceph_add_cap_releases(mdsc, s);
2972                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2973                     s->s_state == CEPH_MDS_SESSION_HUNG)
2974                         ceph_send_cap_releases(mdsc, s);
2975                 mutex_unlock(&s->s_mutex);
2976                 ceph_put_mds_session(s);
2977
2978                 mutex_lock(&mdsc->mutex);
2979         }
2980         mutex_unlock(&mdsc->mutex);
2981
2982         schedule_delayed(mdsc);
2983 }
2984
2985 int ceph_mdsc_init(struct ceph_fs_client *fsc)
2986
2987 {
2988         struct ceph_mds_client *mdsc;
2989
2990         mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
2991         if (!mdsc)
2992                 return -ENOMEM;
2993         mdsc->fsc = fsc;
2994         fsc->mdsc = mdsc;
2995         mutex_init(&mdsc->mutex);
2996         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2997         if (mdsc->mdsmap == NULL)
2998                 return -ENOMEM;
2999
3000         init_completion(&mdsc->safe_umount_waiters);
3001         init_waitqueue_head(&mdsc->session_close_wq);
3002         INIT_LIST_HEAD(&mdsc->waiting_for_map);
3003         mdsc->sessions = NULL;
3004         mdsc->max_sessions = 0;
3005         mdsc->stopping = 0;
3006         init_rwsem(&mdsc->snap_rwsem);
3007         mdsc->snap_realms = RB_ROOT;
3008         INIT_LIST_HEAD(&mdsc->snap_empty);
3009         spin_lock_init(&mdsc->snap_empty_lock);
3010         mdsc->last_tid = 0;
3011         mdsc->request_tree = RB_ROOT;
3012         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3013         mdsc->last_renew_caps = jiffies;
3014         INIT_LIST_HEAD(&mdsc->cap_delay_list);
3015         spin_lock_init(&mdsc->cap_delay_lock);
3016         INIT_LIST_HEAD(&mdsc->snap_flush_list);
3017         spin_lock_init(&mdsc->snap_flush_lock);
3018         mdsc->cap_flush_seq = 0;
3019         INIT_LIST_HEAD(&mdsc->cap_dirty);
3020         INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3021         mdsc->num_cap_flushing = 0;
3022         spin_lock_init(&mdsc->cap_dirty_lock);
3023         init_waitqueue_head(&mdsc->cap_flushing_wq);
3024         spin_lock_init(&mdsc->dentry_lru_lock);
3025         INIT_LIST_HEAD(&mdsc->dentry_lru);
3026
3027         ceph_caps_init(mdsc);
3028         ceph_adjust_min_caps(mdsc, fsc->min_caps);
3029
3030         return 0;
3031 }
3032
3033 /*
3034  * Wait for safe replies on open mds requests.  If we time out, drop
3035  * all requests from the tree to avoid dangling dentry refs.
3036  */
3037 static void wait_requests(struct ceph_mds_client *mdsc)
3038 {
3039         struct ceph_mds_request *req;
3040         struct ceph_fs_client *fsc = mdsc->fsc;
3041
3042         mutex_lock(&mdsc->mutex);
3043         if (__get_oldest_req(mdsc)) {
3044                 mutex_unlock(&mdsc->mutex);
3045
3046                 dout("wait_requests waiting for requests\n");
3047                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3048                                     fsc->client->options->mount_timeout * HZ);
3049
3050                 /* tear down remaining requests */
3051                 mutex_lock(&mdsc->mutex);
3052                 while ((req = __get_oldest_req(mdsc))) {
3053                         dout("wait_requests timed out on tid %llu\n",
3054                              req->r_tid);
3055                         __unregister_request(mdsc, req);
3056                 }
3057         }
3058         mutex_unlock(&mdsc->mutex);
3059         dout("wait_requests done\n");
3060 }
3061
3062 /*
3063  * called before mount is ro, and before dentries are torn down.
3064  * (hmm, does this still race with new lookups?)
3065  */
3066 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3067 {
3068         dout("pre_umount\n");
3069         mdsc->stopping = 1;
3070
3071         drop_leases(mdsc);
3072         ceph_flush_dirty_caps(mdsc);
3073         wait_requests(mdsc);
3074
3075         /*
3076          * wait for reply handlers to drop their request refs and
3077          * their inode/dcache refs
3078          */
3079         ceph_msgr_flush();
3080 }
3081
3082 /*
3083  * wait for all write mds requests to flush.
3084  */
3085 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3086 {
3087         struct ceph_mds_request *req = NULL, *nextreq;
3088         struct rb_node *n;
3089
3090         mutex_lock(&mdsc->mutex);
3091         dout("wait_unsafe_requests want %lld\n", want_tid);
3092 restart:
3093         req = __get_oldest_req(mdsc);
3094         while (req && req->r_tid <= want_tid) {
3095                 /* find next request */
3096                 n = rb_next(&req->r_node);
3097                 if (n)
3098                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3099                 else
3100                         nextreq = NULL;
3101                 if ((req->r_op & CEPH_MDS_OP_WRITE)) {
3102                         /* write op */
3103                         ceph_mdsc_get_request(req);
3104                         if (nextreq)
3105                                 ceph_mdsc_get_request(nextreq);
3106                         mutex_unlock(&mdsc->mutex);
3107                         dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3108                              req->r_tid, want_tid);
3109                         wait_for_completion(&req->r_safe_completion);
3110                         mutex_lock(&mdsc->mutex);
3111                         ceph_mdsc_put_request(req);
3112                         if (!nextreq)
3113                                 break;  /* next dne before, so we're done! */
3114                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
3115                                 /* next request was removed from tree */
3116                                 ceph_mdsc_put_request(nextreq);
3117                                 goto restart;
3118                         }
3119                         ceph_mdsc_put_request(nextreq);  /* won't go away */
3120                 }
3121                 req = nextreq;
3122         }
3123         mutex_unlock(&mdsc->mutex);
3124         dout("wait_unsafe_requests done\n");
3125 }
3126
3127 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3128 {
3129         u64 want_tid, want_flush;
3130
3131         if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3132                 return;
3133
3134         dout("sync\n");
3135         mutex_lock(&mdsc->mutex);
3136         want_tid = mdsc->last_tid;
3137         want_flush = mdsc->cap_flush_seq;
3138         mutex_unlock(&mdsc->mutex);
3139         dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3140
3141         ceph_flush_dirty_caps(mdsc);
3142
3143         wait_unsafe_requests(mdsc, want_tid);
3144         wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3145 }
3146
3147 /*
3148  * true if all sessions are closed, or we force unmount
3149  */
3150 static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3151 {
3152         int i, n = 0;
3153
3154         if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3155                 return true;
3156
3157         mutex_lock(&mdsc->mutex);
3158         for (i = 0; i < mdsc->max_sessions; i++)
3159                 if (mdsc->sessions[i])
3160                         n++;
3161         mutex_unlock(&mdsc->mutex);
3162         return n == 0;
3163 }
3164
3165 /*
3166  * called after sb is ro.
3167  */
3168 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3169 {
3170         struct ceph_mds_session *session;
3171         int i;
3172         struct ceph_fs_client *fsc = mdsc->fsc;
3173         unsigned long timeout = fsc->client->options->mount_timeout * HZ;
3174
3175         dout("close_sessions\n");
3176
3177         /* close sessions */
3178         mutex_lock(&mdsc->mutex);
3179         for (i = 0; i < mdsc->max_sessions; i++) {
3180                 session = __ceph_lookup_mds_session(mdsc, i);
3181                 if (!session)
3182                         continue;
3183                 mutex_unlock(&mdsc->mutex);
3184                 mutex_lock(&session->s_mutex);
3185                 __close_session(mdsc, session);
3186                 mutex_unlock(&session->s_mutex);
3187                 ceph_put_mds_session(session);
3188                 mutex_lock(&mdsc->mutex);
3189         }
3190         mutex_unlock(&mdsc->mutex);
3191
3192         dout("waiting for sessions to close\n");
3193         wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3194                            timeout);
3195
3196         /* tear down remaining sessions */
3197         mutex_lock(&mdsc->mutex);
3198         for (i = 0; i < mdsc->max_sessions; i++) {
3199                 if (mdsc->sessions[i]) {
3200                         session = get_session(mdsc->sessions[i]);
3201                         __unregister_session(mdsc, session);
3202                         mutex_unlock(&mdsc->mutex);
3203                         mutex_lock(&session->s_mutex);
3204                         remove_session_caps(session);
3205                         mutex_unlock(&session->s_mutex);
3206                         ceph_put_mds_session(session);
3207                         mutex_lock(&mdsc->mutex);
3208                 }
3209         }
3210         WARN_ON(!list_empty(&mdsc->cap_delay_list));
3211         mutex_unlock(&mdsc->mutex);
3212
3213         ceph_cleanup_empty_realms(mdsc);
3214
3215         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3216
3217         dout("stopped\n");
3218 }
3219
3220 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3221 {
3222         dout("stop\n");
3223         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3224         if (mdsc->mdsmap)
3225                 ceph_mdsmap_destroy(mdsc->mdsmap);
3226         kfree(mdsc->sessions);
3227         ceph_caps_finalize(mdsc);
3228 }
3229
3230 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3231 {
3232         struct ceph_mds_client *mdsc = fsc->mdsc;
3233
3234         dout("mdsc_destroy %p\n", mdsc);
3235         ceph_mdsc_stop(mdsc);
3236
3237         /* flush out any connection work with references to us */
3238         ceph_msgr_flush();
3239
3240         fsc->mdsc = NULL;
3241         kfree(mdsc);
3242         dout("mdsc_destroy %p done\n", mdsc);
3243 }
3244
3245
3246 /*
3247  * handle mds map update.
3248  */
3249 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3250 {
3251         u32 epoch;
3252         u32 maplen;
3253         void *p = msg->front.iov_base;
3254         void *end = p + msg->front.iov_len;
3255         struct ceph_mdsmap *newmap, *oldmap;
3256         struct ceph_fsid fsid;
3257         int err = -EINVAL;
3258
3259         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3260         ceph_decode_copy(&p, &fsid, sizeof(fsid));
3261         if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3262                 return;
3263         epoch = ceph_decode_32(&p);
3264         maplen = ceph_decode_32(&p);
3265         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3266
3267         /* do we need it? */
3268         ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3269         mutex_lock(&mdsc->mutex);
3270         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3271                 dout("handle_map epoch %u <= our %u\n",
3272                      epoch, mdsc->mdsmap->m_epoch);
3273                 mutex_unlock(&mdsc->mutex);
3274                 return;
3275         }
3276
3277         newmap = ceph_mdsmap_decode(&p, end);
3278         if (IS_ERR(newmap)) {
3279                 err = PTR_ERR(newmap);
3280                 goto bad_unlock;
3281         }
3282
3283         /* swap into place */
3284         if (mdsc->mdsmap) {
3285                 oldmap = mdsc->mdsmap;
3286                 mdsc->mdsmap = newmap;
3287                 check_new_map(mdsc, newmap, oldmap);
3288                 ceph_mdsmap_destroy(oldmap);
3289         } else {
3290                 mdsc->mdsmap = newmap;  /* first mds map */
3291         }
3292         mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3293
3294         __wake_requests(mdsc, &mdsc->waiting_for_map);
3295
3296         mutex_unlock(&mdsc->mutex);
3297         schedule_delayed(mdsc);
3298         return;
3299
3300 bad_unlock:
3301         mutex_unlock(&mdsc->mutex);
3302 bad:
3303         pr_err("error decoding mdsmap %d\n", err);
3304         return;
3305 }
3306
3307 static struct ceph_connection *con_get(struct ceph_connection *con)
3308 {
3309         struct ceph_mds_session *s = con->private;
3310
3311         if (get_session(s)) {
3312                 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3313                 return con;
3314         }
3315         dout("mdsc con_get %p FAIL\n", s);
3316         return NULL;
3317 }
3318
3319 static void con_put(struct ceph_connection *con)
3320 {
3321         struct ceph_mds_session *s = con->private;
3322
3323         dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3324         ceph_put_mds_session(s);
3325 }
3326
3327 /*
3328  * if the client is unresponsive for long enough, the mds will kill
3329  * the session entirely.
3330  */
3331 static void peer_reset(struct ceph_connection *con)
3332 {
3333         struct ceph_mds_session *s = con->private;
3334         struct ceph_mds_client *mdsc = s->s_mdsc;
3335
3336         pr_warning("mds%d closed our session\n", s->s_mds);
3337         send_mds_reconnect(mdsc, s);
3338 }
3339
3340 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3341 {
3342         struct ceph_mds_session *s = con->private;
3343         struct ceph_mds_client *mdsc = s->s_mdsc;
3344         int type = le16_to_cpu(msg->hdr.type);
3345
3346         mutex_lock(&mdsc->mutex);
3347         if (__verify_registered_session(mdsc, s) < 0) {
3348                 mutex_unlock(&mdsc->mutex);
3349                 goto out;
3350         }
3351         mutex_unlock(&mdsc->mutex);
3352
3353         switch (type) {
3354         case CEPH_MSG_MDS_MAP:
3355                 ceph_mdsc_handle_map(mdsc, msg);
3356                 break;
3357         case CEPH_MSG_CLIENT_SESSION:
3358                 handle_session(s, msg);
3359                 break;
3360         case CEPH_MSG_CLIENT_REPLY:
3361                 handle_reply(s, msg);
3362                 break;
3363         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3364                 handle_forward(mdsc, s, msg);
3365                 break;
3366         case CEPH_MSG_CLIENT_CAPS:
3367                 ceph_handle_caps(s, msg);
3368                 break;
3369         case CEPH_MSG_CLIENT_SNAP:
3370                 ceph_handle_snap(mdsc, s, msg);
3371                 break;
3372         case CEPH_MSG_CLIENT_LEASE:
3373                 handle_lease(mdsc, s, msg);
3374                 break;
3375
3376         default:
3377                 pr_err("received unknown message type %d %s\n", type,
3378                        ceph_msg_type_name(type));
3379         }
3380 out:
3381         ceph_msg_put(msg);
3382 }
3383
3384 /*
3385  * authentication
3386  */
3387
3388 /*
3389  * Note: returned pointer is the address of a structure that's
3390  * managed separately.  Caller must *not* attempt to free it.
3391  */
3392 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
3393                                         int *proto, int force_new)
3394 {
3395         struct ceph_mds_session *s = con->private;
3396         struct ceph_mds_client *mdsc = s->s_mdsc;
3397         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3398         struct ceph_auth_handshake *auth = &s->s_auth;
3399
3400         if (force_new && auth->authorizer) {
3401                 if (ac->ops && ac->ops->destroy_authorizer)
3402                         ac->ops->destroy_authorizer(ac, auth->authorizer);
3403                 auth->authorizer = NULL;
3404         }
3405         if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
3406                 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3407                                                         auth);
3408                 if (ret)
3409                         return ERR_PTR(ret);
3410         }
3411         *proto = ac->protocol;
3412
3413         return auth;
3414 }
3415
3416
3417 static int verify_authorizer_reply(struct ceph_connection *con, int len)
3418 {
3419         struct ceph_mds_session *s = con->private;
3420         struct ceph_mds_client *mdsc = s->s_mdsc;
3421         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3422
3423         return ac->ops->verify_authorizer_reply(ac, s->s_auth.authorizer, len);
3424 }
3425
3426 static int invalidate_authorizer(struct ceph_connection *con)
3427 {
3428         struct ceph_mds_session *s = con->private;
3429         struct ceph_mds_client *mdsc = s->s_mdsc;
3430         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3431
3432         if (ac->ops->invalidate_authorizer)
3433                 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3434
3435         return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3436 }
3437
3438 static const struct ceph_connection_operations mds_con_ops = {
3439         .get = con_get,
3440         .put = con_put,
3441         .dispatch = dispatch,
3442         .get_authorizer = get_authorizer,
3443         .verify_authorizer_reply = verify_authorizer_reply,
3444         .invalidate_authorizer = invalidate_authorizer,
3445         .peer_reset = peer_reset,
3446 };
3447
3448 /* eof */