ceph: wake up 'safe' waiters when unregistering request
[firefly-linux-kernel-4.4.55.git] / fs / ceph / mds_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/fs.h>
4 #include <linux/wait.h>
5 #include <linux/slab.h>
6 #include <linux/sched.h>
7 #include <linux/debugfs.h>
8 #include <linux/seq_file.h>
9
10 #include "super.h"
11 #include "mds_client.h"
12
13 #include <linux/ceph/ceph_features.h>
14 #include <linux/ceph/messenger.h>
15 #include <linux/ceph/decode.h>
16 #include <linux/ceph/pagelist.h>
17 #include <linux/ceph/auth.h>
18 #include <linux/ceph/debugfs.h>
19
20 /*
21  * A cluster of MDS (metadata server) daemons is responsible for
22  * managing the file system namespace (the directory hierarchy and
23  * inodes) and for coordinating shared access to storage.  Metadata is
24  * partitioning hierarchically across a number of servers, and that
25  * partition varies over time as the cluster adjusts the distribution
26  * in order to balance load.
27  *
28  * The MDS client is primarily responsible to managing synchronous
29  * metadata requests for operations like open, unlink, and so forth.
30  * If there is a MDS failure, we find out about it when we (possibly
31  * request and) receive a new MDS map, and can resubmit affected
32  * requests.
33  *
34  * For the most part, though, we take advantage of a lossless
35  * communications channel to the MDS, and do not need to worry about
36  * timing out or resubmitting requests.
37  *
38  * We maintain a stateful "session" with each MDS we interact with.
39  * Within each session, we sent periodic heartbeat messages to ensure
40  * any capabilities or leases we have been issues remain valid.  If
41  * the session times out and goes stale, our leases and capabilities
42  * are no longer valid.
43  */
44
45 struct ceph_reconnect_state {
46         struct ceph_pagelist *pagelist;
47         bool flock;
48 };
49
50 static void __wake_requests(struct ceph_mds_client *mdsc,
51                             struct list_head *head);
52
53 static const struct ceph_connection_operations mds_con_ops;
54
55
56 /*
57  * mds reply parsing
58  */
59
60 /*
61  * parse individual inode info
62  */
63 static int parse_reply_info_in(void **p, void *end,
64                                struct ceph_mds_reply_info_in *info,
65                                int features)
66 {
67         int err = -EIO;
68
69         info->in = *p;
70         *p += sizeof(struct ceph_mds_reply_inode) +
71                 sizeof(*info->in->fragtree.splits) *
72                 le32_to_cpu(info->in->fragtree.nsplits);
73
74         ceph_decode_32_safe(p, end, info->symlink_len, bad);
75         ceph_decode_need(p, end, info->symlink_len, bad);
76         info->symlink = *p;
77         *p += info->symlink_len;
78
79         if (features & CEPH_FEATURE_DIRLAYOUTHASH)
80                 ceph_decode_copy_safe(p, end, &info->dir_layout,
81                                       sizeof(info->dir_layout), bad);
82         else
83                 memset(&info->dir_layout, 0, sizeof(info->dir_layout));
84
85         ceph_decode_32_safe(p, end, info->xattr_len, bad);
86         ceph_decode_need(p, end, info->xattr_len, bad);
87         info->xattr_data = *p;
88         *p += info->xattr_len;
89         return 0;
90 bad:
91         return err;
92 }
93
94 /*
95  * parse a normal reply, which may contain a (dir+)dentry and/or a
96  * target inode.
97  */
98 static int parse_reply_info_trace(void **p, void *end,
99                                   struct ceph_mds_reply_info_parsed *info,
100                                   int features)
101 {
102         int err;
103
104         if (info->head->is_dentry) {
105                 err = parse_reply_info_in(p, end, &info->diri, features);
106                 if (err < 0)
107                         goto out_bad;
108
109                 if (unlikely(*p + sizeof(*info->dirfrag) > end))
110                         goto bad;
111                 info->dirfrag = *p;
112                 *p += sizeof(*info->dirfrag) +
113                         sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
114                 if (unlikely(*p > end))
115                         goto bad;
116
117                 ceph_decode_32_safe(p, end, info->dname_len, bad);
118                 ceph_decode_need(p, end, info->dname_len, bad);
119                 info->dname = *p;
120                 *p += info->dname_len;
121                 info->dlease = *p;
122                 *p += sizeof(*info->dlease);
123         }
124
125         if (info->head->is_target) {
126                 err = parse_reply_info_in(p, end, &info->targeti, features);
127                 if (err < 0)
128                         goto out_bad;
129         }
130
131         if (unlikely(*p != end))
132                 goto bad;
133         return 0;
134
135 bad:
136         err = -EIO;
137 out_bad:
138         pr_err("problem parsing mds trace %d\n", err);
139         return err;
140 }
141
142 /*
143  * parse readdir results
144  */
145 static int parse_reply_info_dir(void **p, void *end,
146                                 struct ceph_mds_reply_info_parsed *info,
147                                 int features)
148 {
149         u32 num, i = 0;
150         int err;
151
152         info->dir_dir = *p;
153         if (*p + sizeof(*info->dir_dir) > end)
154                 goto bad;
155         *p += sizeof(*info->dir_dir) +
156                 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
157         if (*p > end)
158                 goto bad;
159
160         ceph_decode_need(p, end, sizeof(num) + 2, bad);
161         num = ceph_decode_32(p);
162         info->dir_end = ceph_decode_8(p);
163         info->dir_complete = ceph_decode_8(p);
164         if (num == 0)
165                 goto done;
166
167         /* alloc large array */
168         info->dir_nr = num;
169         info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
170                                sizeof(*info->dir_dname) +
171                                sizeof(*info->dir_dname_len) +
172                                sizeof(*info->dir_dlease),
173                                GFP_NOFS);
174         if (info->dir_in == NULL) {
175                 err = -ENOMEM;
176                 goto out_bad;
177         }
178         info->dir_dname = (void *)(info->dir_in + num);
179         info->dir_dname_len = (void *)(info->dir_dname + num);
180         info->dir_dlease = (void *)(info->dir_dname_len + num);
181
182         while (num) {
183                 /* dentry */
184                 ceph_decode_need(p, end, sizeof(u32)*2, bad);
185                 info->dir_dname_len[i] = ceph_decode_32(p);
186                 ceph_decode_need(p, end, info->dir_dname_len[i], bad);
187                 info->dir_dname[i] = *p;
188                 *p += info->dir_dname_len[i];
189                 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
190                      info->dir_dname[i]);
191                 info->dir_dlease[i] = *p;
192                 *p += sizeof(struct ceph_mds_reply_lease);
193
194                 /* inode */
195                 err = parse_reply_info_in(p, end, &info->dir_in[i], features);
196                 if (err < 0)
197                         goto out_bad;
198                 i++;
199                 num--;
200         }
201
202 done:
203         if (*p != end)
204                 goto bad;
205         return 0;
206
207 bad:
208         err = -EIO;
209 out_bad:
210         pr_err("problem parsing dir contents %d\n", err);
211         return err;
212 }
213
214 /*
215  * parse fcntl F_GETLK results
216  */
217 static int parse_reply_info_filelock(void **p, void *end,
218                                      struct ceph_mds_reply_info_parsed *info,
219                                      int features)
220 {
221         if (*p + sizeof(*info->filelock_reply) > end)
222                 goto bad;
223
224         info->filelock_reply = *p;
225         *p += sizeof(*info->filelock_reply);
226
227         if (unlikely(*p != end))
228                 goto bad;
229         return 0;
230
231 bad:
232         return -EIO;
233 }
234
235 /*
236  * parse create results
237  */
238 static int parse_reply_info_create(void **p, void *end,
239                                   struct ceph_mds_reply_info_parsed *info,
240                                   int features)
241 {
242         if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
243                 if (*p == end) {
244                         info->has_create_ino = false;
245                 } else {
246                         info->has_create_ino = true;
247                         info->ino = ceph_decode_64(p);
248                 }
249         }
250
251         if (unlikely(*p != end))
252                 goto bad;
253         return 0;
254
255 bad:
256         return -EIO;
257 }
258
259 /*
260  * parse extra results
261  */
262 static int parse_reply_info_extra(void **p, void *end,
263                                   struct ceph_mds_reply_info_parsed *info,
264                                   int features)
265 {
266         if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
267                 return parse_reply_info_filelock(p, end, info, features);
268         else if (info->head->op == CEPH_MDS_OP_READDIR ||
269                  info->head->op == CEPH_MDS_OP_LSSNAP)
270                 return parse_reply_info_dir(p, end, info, features);
271         else if (info->head->op == CEPH_MDS_OP_CREATE)
272                 return parse_reply_info_create(p, end, info, features);
273         else
274                 return -EIO;
275 }
276
277 /*
278  * parse entire mds reply
279  */
280 static int parse_reply_info(struct ceph_msg *msg,
281                             struct ceph_mds_reply_info_parsed *info,
282                             int features)
283 {
284         void *p, *end;
285         u32 len;
286         int err;
287
288         info->head = msg->front.iov_base;
289         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
290         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
291
292         /* trace */
293         ceph_decode_32_safe(&p, end, len, bad);
294         if (len > 0) {
295                 ceph_decode_need(&p, end, len, bad);
296                 err = parse_reply_info_trace(&p, p+len, info, features);
297                 if (err < 0)
298                         goto out_bad;
299         }
300
301         /* extra */
302         ceph_decode_32_safe(&p, end, len, bad);
303         if (len > 0) {
304                 ceph_decode_need(&p, end, len, bad);
305                 err = parse_reply_info_extra(&p, p+len, info, features);
306                 if (err < 0)
307                         goto out_bad;
308         }
309
310         /* snap blob */
311         ceph_decode_32_safe(&p, end, len, bad);
312         info->snapblob_len = len;
313         info->snapblob = p;
314         p += len;
315
316         if (p != end)
317                 goto bad;
318         return 0;
319
320 bad:
321         err = -EIO;
322 out_bad:
323         pr_err("mds parse_reply err %d\n", err);
324         return err;
325 }
326
327 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
328 {
329         kfree(info->dir_in);
330 }
331
332
333 /*
334  * sessions
335  */
336 static const char *session_state_name(int s)
337 {
338         switch (s) {
339         case CEPH_MDS_SESSION_NEW: return "new";
340         case CEPH_MDS_SESSION_OPENING: return "opening";
341         case CEPH_MDS_SESSION_OPEN: return "open";
342         case CEPH_MDS_SESSION_HUNG: return "hung";
343         case CEPH_MDS_SESSION_CLOSING: return "closing";
344         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
345         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
346         default: return "???";
347         }
348 }
349
350 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
351 {
352         if (atomic_inc_not_zero(&s->s_ref)) {
353                 dout("mdsc get_session %p %d -> %d\n", s,
354                      atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
355                 return s;
356         } else {
357                 dout("mdsc get_session %p 0 -- FAIL", s);
358                 return NULL;
359         }
360 }
361
362 void ceph_put_mds_session(struct ceph_mds_session *s)
363 {
364         dout("mdsc put_session %p %d -> %d\n", s,
365              atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
366         if (atomic_dec_and_test(&s->s_ref)) {
367                 if (s->s_auth.authorizer)
368                         ceph_auth_destroy_authorizer(
369                                 s->s_mdsc->fsc->client->monc.auth,
370                                 s->s_auth.authorizer);
371                 kfree(s);
372         }
373 }
374
375 /*
376  * called under mdsc->mutex
377  */
378 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
379                                                    int mds)
380 {
381         struct ceph_mds_session *session;
382
383         if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
384                 return NULL;
385         session = mdsc->sessions[mds];
386         dout("lookup_mds_session %p %d\n", session,
387              atomic_read(&session->s_ref));
388         get_session(session);
389         return session;
390 }
391
392 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
393 {
394         if (mds >= mdsc->max_sessions)
395                 return false;
396         return mdsc->sessions[mds];
397 }
398
399 static int __verify_registered_session(struct ceph_mds_client *mdsc,
400                                        struct ceph_mds_session *s)
401 {
402         if (s->s_mds >= mdsc->max_sessions ||
403             mdsc->sessions[s->s_mds] != s)
404                 return -ENOENT;
405         return 0;
406 }
407
408 /*
409  * create+register a new session for given mds.
410  * called under mdsc->mutex.
411  */
412 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
413                                                  int mds)
414 {
415         struct ceph_mds_session *s;
416
417         s = kzalloc(sizeof(*s), GFP_NOFS);
418         if (!s)
419                 return ERR_PTR(-ENOMEM);
420         s->s_mdsc = mdsc;
421         s->s_mds = mds;
422         s->s_state = CEPH_MDS_SESSION_NEW;
423         s->s_ttl = 0;
424         s->s_seq = 0;
425         mutex_init(&s->s_mutex);
426
427         ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
428
429         spin_lock_init(&s->s_gen_ttl_lock);
430         s->s_cap_gen = 0;
431         s->s_cap_ttl = jiffies - 1;
432
433         spin_lock_init(&s->s_cap_lock);
434         s->s_renew_requested = 0;
435         s->s_renew_seq = 0;
436         INIT_LIST_HEAD(&s->s_caps);
437         s->s_nr_caps = 0;
438         s->s_trim_caps = 0;
439         atomic_set(&s->s_ref, 1);
440         INIT_LIST_HEAD(&s->s_waiting);
441         INIT_LIST_HEAD(&s->s_unsafe);
442         s->s_num_cap_releases = 0;
443         s->s_cap_iterator = NULL;
444         INIT_LIST_HEAD(&s->s_cap_releases);
445         INIT_LIST_HEAD(&s->s_cap_releases_done);
446         INIT_LIST_HEAD(&s->s_cap_flushing);
447         INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
448
449         dout("register_session mds%d\n", mds);
450         if (mds >= mdsc->max_sessions) {
451                 int newmax = 1 << get_count_order(mds+1);
452                 struct ceph_mds_session **sa;
453
454                 dout("register_session realloc to %d\n", newmax);
455                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
456                 if (sa == NULL)
457                         goto fail_realloc;
458                 if (mdsc->sessions) {
459                         memcpy(sa, mdsc->sessions,
460                                mdsc->max_sessions * sizeof(void *));
461                         kfree(mdsc->sessions);
462                 }
463                 mdsc->sessions = sa;
464                 mdsc->max_sessions = newmax;
465         }
466         mdsc->sessions[mds] = s;
467         atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
468
469         ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
470                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
471
472         return s;
473
474 fail_realloc:
475         kfree(s);
476         return ERR_PTR(-ENOMEM);
477 }
478
479 /*
480  * called under mdsc->mutex
481  */
482 static void __unregister_session(struct ceph_mds_client *mdsc,
483                                struct ceph_mds_session *s)
484 {
485         dout("__unregister_session mds%d %p\n", s->s_mds, s);
486         BUG_ON(mdsc->sessions[s->s_mds] != s);
487         mdsc->sessions[s->s_mds] = NULL;
488         ceph_con_close(&s->s_con);
489         ceph_put_mds_session(s);
490 }
491
492 /*
493  * drop session refs in request.
494  *
495  * should be last request ref, or hold mdsc->mutex
496  */
497 static void put_request_session(struct ceph_mds_request *req)
498 {
499         if (req->r_session) {
500                 ceph_put_mds_session(req->r_session);
501                 req->r_session = NULL;
502         }
503 }
504
505 void ceph_mdsc_release_request(struct kref *kref)
506 {
507         struct ceph_mds_request *req = container_of(kref,
508                                                     struct ceph_mds_request,
509                                                     r_kref);
510         if (req->r_request)
511                 ceph_msg_put(req->r_request);
512         if (req->r_reply) {
513                 ceph_msg_put(req->r_reply);
514                 destroy_reply_info(&req->r_reply_info);
515         }
516         if (req->r_inode) {
517                 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
518                 iput(req->r_inode);
519         }
520         if (req->r_locked_dir)
521                 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
522         if (req->r_target_inode)
523                 iput(req->r_target_inode);
524         if (req->r_dentry)
525                 dput(req->r_dentry);
526         if (req->r_old_dentry) {
527                 /*
528                  * track (and drop pins for) r_old_dentry_dir
529                  * separately, since r_old_dentry's d_parent may have
530                  * changed between the dir mutex being dropped and
531                  * this request being freed.
532                  */
533                 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
534                                   CEPH_CAP_PIN);
535                 dput(req->r_old_dentry);
536                 iput(req->r_old_dentry_dir);
537         }
538         kfree(req->r_path1);
539         kfree(req->r_path2);
540         put_request_session(req);
541         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
542         kfree(req);
543 }
544
545 /*
546  * lookup session, bump ref if found.
547  *
548  * called under mdsc->mutex.
549  */
550 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
551                                              u64 tid)
552 {
553         struct ceph_mds_request *req;
554         struct rb_node *n = mdsc->request_tree.rb_node;
555
556         while (n) {
557                 req = rb_entry(n, struct ceph_mds_request, r_node);
558                 if (tid < req->r_tid)
559                         n = n->rb_left;
560                 else if (tid > req->r_tid)
561                         n = n->rb_right;
562                 else {
563                         ceph_mdsc_get_request(req);
564                         return req;
565                 }
566         }
567         return NULL;
568 }
569
570 static void __insert_request(struct ceph_mds_client *mdsc,
571                              struct ceph_mds_request *new)
572 {
573         struct rb_node **p = &mdsc->request_tree.rb_node;
574         struct rb_node *parent = NULL;
575         struct ceph_mds_request *req = NULL;
576
577         while (*p) {
578                 parent = *p;
579                 req = rb_entry(parent, struct ceph_mds_request, r_node);
580                 if (new->r_tid < req->r_tid)
581                         p = &(*p)->rb_left;
582                 else if (new->r_tid > req->r_tid)
583                         p = &(*p)->rb_right;
584                 else
585                         BUG();
586         }
587
588         rb_link_node(&new->r_node, parent, p);
589         rb_insert_color(&new->r_node, &mdsc->request_tree);
590 }
591
592 /*
593  * Register an in-flight request, and assign a tid.  Link to directory
594  * are modifying (if any).
595  *
596  * Called under mdsc->mutex.
597  */
598 static void __register_request(struct ceph_mds_client *mdsc,
599                                struct ceph_mds_request *req,
600                                struct inode *dir)
601 {
602         req->r_tid = ++mdsc->last_tid;
603         if (req->r_num_caps)
604                 ceph_reserve_caps(mdsc, &req->r_caps_reservation,
605                                   req->r_num_caps);
606         dout("__register_request %p tid %lld\n", req, req->r_tid);
607         ceph_mdsc_get_request(req);
608         __insert_request(mdsc, req);
609
610         req->r_uid = current_fsuid();
611         req->r_gid = current_fsgid();
612
613         if (dir) {
614                 struct ceph_inode_info *ci = ceph_inode(dir);
615
616                 ihold(dir);
617                 spin_lock(&ci->i_unsafe_lock);
618                 req->r_unsafe_dir = dir;
619                 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
620                 spin_unlock(&ci->i_unsafe_lock);
621         }
622 }
623
624 static void __unregister_request(struct ceph_mds_client *mdsc,
625                                  struct ceph_mds_request *req)
626 {
627         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
628         rb_erase(&req->r_node, &mdsc->request_tree);
629         RB_CLEAR_NODE(&req->r_node);
630
631         if (req->r_unsafe_dir) {
632                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
633
634                 spin_lock(&ci->i_unsafe_lock);
635                 list_del_init(&req->r_unsafe_dir_item);
636                 spin_unlock(&ci->i_unsafe_lock);
637
638                 iput(req->r_unsafe_dir);
639                 req->r_unsafe_dir = NULL;
640         }
641
642         complete_all(&req->r_safe_completion);
643
644         ceph_mdsc_put_request(req);
645 }
646
647 /*
648  * Choose mds to send request to next.  If there is a hint set in the
649  * request (e.g., due to a prior forward hint from the mds), use that.
650  * Otherwise, consult frag tree and/or caps to identify the
651  * appropriate mds.  If all else fails, choose randomly.
652  *
653  * Called under mdsc->mutex.
654  */
655 static struct dentry *get_nonsnap_parent(struct dentry *dentry)
656 {
657         /*
658          * we don't need to worry about protecting the d_parent access
659          * here because we never renaming inside the snapped namespace
660          * except to resplice to another snapdir, and either the old or new
661          * result is a valid result.
662          */
663         while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
664                 dentry = dentry->d_parent;
665         return dentry;
666 }
667
668 static int __choose_mds(struct ceph_mds_client *mdsc,
669                         struct ceph_mds_request *req)
670 {
671         struct inode *inode;
672         struct ceph_inode_info *ci;
673         struct ceph_cap *cap;
674         int mode = req->r_direct_mode;
675         int mds = -1;
676         u32 hash = req->r_direct_hash;
677         bool is_hash = req->r_direct_is_hash;
678
679         /*
680          * is there a specific mds we should try?  ignore hint if we have
681          * no session and the mds is not up (active or recovering).
682          */
683         if (req->r_resend_mds >= 0 &&
684             (__have_session(mdsc, req->r_resend_mds) ||
685              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
686                 dout("choose_mds using resend_mds mds%d\n",
687                      req->r_resend_mds);
688                 return req->r_resend_mds;
689         }
690
691         if (mode == USE_RANDOM_MDS)
692                 goto random;
693
694         inode = NULL;
695         if (req->r_inode) {
696                 inode = req->r_inode;
697         } else if (req->r_dentry) {
698                 /* ignore race with rename; old or new d_parent is okay */
699                 struct dentry *parent = req->r_dentry->d_parent;
700                 struct inode *dir = parent->d_inode;
701
702                 if (dir->i_sb != mdsc->fsc->sb) {
703                         /* not this fs! */
704                         inode = req->r_dentry->d_inode;
705                 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
706                         /* direct snapped/virtual snapdir requests
707                          * based on parent dir inode */
708                         struct dentry *dn = get_nonsnap_parent(parent);
709                         inode = dn->d_inode;
710                         dout("__choose_mds using nonsnap parent %p\n", inode);
711                 } else if (req->r_dentry->d_inode) {
712                         /* dentry target */
713                         inode = req->r_dentry->d_inode;
714                 } else {
715                         /* dir + name */
716                         inode = dir;
717                         hash = ceph_dentry_hash(dir, req->r_dentry);
718                         is_hash = true;
719                 }
720         }
721
722         dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
723              (int)hash, mode);
724         if (!inode)
725                 goto random;
726         ci = ceph_inode(inode);
727
728         if (is_hash && S_ISDIR(inode->i_mode)) {
729                 struct ceph_inode_frag frag;
730                 int found;
731
732                 ceph_choose_frag(ci, hash, &frag, &found);
733                 if (found) {
734                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
735                                 u8 r;
736
737                                 /* choose a random replica */
738                                 get_random_bytes(&r, 1);
739                                 r %= frag.ndist;
740                                 mds = frag.dist[r];
741                                 dout("choose_mds %p %llx.%llx "
742                                      "frag %u mds%d (%d/%d)\n",
743                                      inode, ceph_vinop(inode),
744                                      frag.frag, mds,
745                                      (int)r, frag.ndist);
746                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
747                                     CEPH_MDS_STATE_ACTIVE)
748                                         return mds;
749                         }
750
751                         /* since this file/dir wasn't known to be
752                          * replicated, then we want to look for the
753                          * authoritative mds. */
754                         mode = USE_AUTH_MDS;
755                         if (frag.mds >= 0) {
756                                 /* choose auth mds */
757                                 mds = frag.mds;
758                                 dout("choose_mds %p %llx.%llx "
759                                      "frag %u mds%d (auth)\n",
760                                      inode, ceph_vinop(inode), frag.frag, mds);
761                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
762                                     CEPH_MDS_STATE_ACTIVE)
763                                         return mds;
764                         }
765                 }
766         }
767
768         spin_lock(&ci->i_ceph_lock);
769         cap = NULL;
770         if (mode == USE_AUTH_MDS)
771                 cap = ci->i_auth_cap;
772         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
773                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
774         if (!cap) {
775                 spin_unlock(&ci->i_ceph_lock);
776                 goto random;
777         }
778         mds = cap->session->s_mds;
779         dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
780              inode, ceph_vinop(inode), mds,
781              cap == ci->i_auth_cap ? "auth " : "", cap);
782         spin_unlock(&ci->i_ceph_lock);
783         return mds;
784
785 random:
786         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
787         dout("choose_mds chose random mds%d\n", mds);
788         return mds;
789 }
790
791
792 /*
793  * session messages
794  */
795 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
796 {
797         struct ceph_msg *msg;
798         struct ceph_mds_session_head *h;
799
800         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
801                            false);
802         if (!msg) {
803                 pr_err("create_session_msg ENOMEM creating msg\n");
804                 return NULL;
805         }
806         h = msg->front.iov_base;
807         h->op = cpu_to_le32(op);
808         h->seq = cpu_to_le64(seq);
809         return msg;
810 }
811
812 /*
813  * send session open request.
814  *
815  * called under mdsc->mutex
816  */
817 static int __open_session(struct ceph_mds_client *mdsc,
818                           struct ceph_mds_session *session)
819 {
820         struct ceph_msg *msg;
821         int mstate;
822         int mds = session->s_mds;
823
824         /* wait for mds to go active? */
825         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
826         dout("open_session to mds%d (%s)\n", mds,
827              ceph_mds_state_name(mstate));
828         session->s_state = CEPH_MDS_SESSION_OPENING;
829         session->s_renew_requested = jiffies;
830
831         /* send connect message */
832         msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
833         if (!msg)
834                 return -ENOMEM;
835         ceph_con_send(&session->s_con, msg);
836         return 0;
837 }
838
839 /*
840  * open sessions for any export targets for the given mds
841  *
842  * called under mdsc->mutex
843  */
844 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
845                                           struct ceph_mds_session *session)
846 {
847         struct ceph_mds_info *mi;
848         struct ceph_mds_session *ts;
849         int i, mds = session->s_mds;
850         int target;
851
852         if (mds >= mdsc->mdsmap->m_max_mds)
853                 return;
854         mi = &mdsc->mdsmap->m_info[mds];
855         dout("open_export_target_sessions for mds%d (%d targets)\n",
856              session->s_mds, mi->num_export_targets);
857
858         for (i = 0; i < mi->num_export_targets; i++) {
859                 target = mi->export_targets[i];
860                 ts = __ceph_lookup_mds_session(mdsc, target);
861                 if (!ts) {
862                         ts = register_session(mdsc, target);
863                         if (IS_ERR(ts))
864                                 return;
865                 }
866                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
867                     session->s_state == CEPH_MDS_SESSION_CLOSING)
868                         __open_session(mdsc, session);
869                 else
870                         dout(" mds%d target mds%d %p is %s\n", session->s_mds,
871                              i, ts, session_state_name(ts->s_state));
872                 ceph_put_mds_session(ts);
873         }
874 }
875
876 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
877                                            struct ceph_mds_session *session)
878 {
879         mutex_lock(&mdsc->mutex);
880         __open_export_target_sessions(mdsc, session);
881         mutex_unlock(&mdsc->mutex);
882 }
883
884 /*
885  * session caps
886  */
887
888 /*
889  * Free preallocated cap messages assigned to this session
890  */
891 static void cleanup_cap_releases(struct ceph_mds_session *session)
892 {
893         struct ceph_msg *msg;
894
895         spin_lock(&session->s_cap_lock);
896         while (!list_empty(&session->s_cap_releases)) {
897                 msg = list_first_entry(&session->s_cap_releases,
898                                        struct ceph_msg, list_head);
899                 list_del_init(&msg->list_head);
900                 ceph_msg_put(msg);
901         }
902         while (!list_empty(&session->s_cap_releases_done)) {
903                 msg = list_first_entry(&session->s_cap_releases_done,
904                                        struct ceph_msg, list_head);
905                 list_del_init(&msg->list_head);
906                 ceph_msg_put(msg);
907         }
908         spin_unlock(&session->s_cap_lock);
909 }
910
911 /*
912  * Helper to safely iterate over all caps associated with a session, with
913  * special care taken to handle a racing __ceph_remove_cap().
914  *
915  * Caller must hold session s_mutex.
916  */
917 static int iterate_session_caps(struct ceph_mds_session *session,
918                                  int (*cb)(struct inode *, struct ceph_cap *,
919                                             void *), void *arg)
920 {
921         struct list_head *p;
922         struct ceph_cap *cap;
923         struct inode *inode, *last_inode = NULL;
924         struct ceph_cap *old_cap = NULL;
925         int ret;
926
927         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
928         spin_lock(&session->s_cap_lock);
929         p = session->s_caps.next;
930         while (p != &session->s_caps) {
931                 cap = list_entry(p, struct ceph_cap, session_caps);
932                 inode = igrab(&cap->ci->vfs_inode);
933                 if (!inode) {
934                         p = p->next;
935                         continue;
936                 }
937                 session->s_cap_iterator = cap;
938                 spin_unlock(&session->s_cap_lock);
939
940                 if (last_inode) {
941                         iput(last_inode);
942                         last_inode = NULL;
943                 }
944                 if (old_cap) {
945                         ceph_put_cap(session->s_mdsc, old_cap);
946                         old_cap = NULL;
947                 }
948
949                 ret = cb(inode, cap, arg);
950                 last_inode = inode;
951
952                 spin_lock(&session->s_cap_lock);
953                 p = p->next;
954                 if (cap->ci == NULL) {
955                         dout("iterate_session_caps  finishing cap %p removal\n",
956                              cap);
957                         BUG_ON(cap->session != session);
958                         list_del_init(&cap->session_caps);
959                         session->s_nr_caps--;
960                         cap->session = NULL;
961                         old_cap = cap;  /* put_cap it w/o locks held */
962                 }
963                 if (ret < 0)
964                         goto out;
965         }
966         ret = 0;
967 out:
968         session->s_cap_iterator = NULL;
969         spin_unlock(&session->s_cap_lock);
970
971         if (last_inode)
972                 iput(last_inode);
973         if (old_cap)
974                 ceph_put_cap(session->s_mdsc, old_cap);
975
976         return ret;
977 }
978
979 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
980                                   void *arg)
981 {
982         struct ceph_inode_info *ci = ceph_inode(inode);
983         int drop = 0;
984
985         dout("removing cap %p, ci is %p, inode is %p\n",
986              cap, ci, &ci->vfs_inode);
987         spin_lock(&ci->i_ceph_lock);
988         __ceph_remove_cap(cap);
989         if (!__ceph_is_any_real_caps(ci)) {
990                 struct ceph_mds_client *mdsc =
991                         ceph_sb_to_client(inode->i_sb)->mdsc;
992
993                 spin_lock(&mdsc->cap_dirty_lock);
994                 if (!list_empty(&ci->i_dirty_item)) {
995                         pr_info(" dropping dirty %s state for %p %lld\n",
996                                 ceph_cap_string(ci->i_dirty_caps),
997                                 inode, ceph_ino(inode));
998                         ci->i_dirty_caps = 0;
999                         list_del_init(&ci->i_dirty_item);
1000                         drop = 1;
1001                 }
1002                 if (!list_empty(&ci->i_flushing_item)) {
1003                         pr_info(" dropping dirty+flushing %s state for %p %lld\n",
1004                                 ceph_cap_string(ci->i_flushing_caps),
1005                                 inode, ceph_ino(inode));
1006                         ci->i_flushing_caps = 0;
1007                         list_del_init(&ci->i_flushing_item);
1008                         mdsc->num_cap_flushing--;
1009                         drop = 1;
1010                 }
1011                 if (drop && ci->i_wrbuffer_ref) {
1012                         pr_info(" dropping dirty data for %p %lld\n",
1013                                 inode, ceph_ino(inode));
1014                         ci->i_wrbuffer_ref = 0;
1015                         ci->i_wrbuffer_ref_head = 0;
1016                         drop++;
1017                 }
1018                 spin_unlock(&mdsc->cap_dirty_lock);
1019         }
1020         spin_unlock(&ci->i_ceph_lock);
1021         while (drop--)
1022                 iput(inode);
1023         return 0;
1024 }
1025
1026 /*
1027  * caller must hold session s_mutex
1028  */
1029 static void remove_session_caps(struct ceph_mds_session *session)
1030 {
1031         dout("remove_session_caps on %p\n", session);
1032         iterate_session_caps(session, remove_session_caps_cb, NULL);
1033         BUG_ON(session->s_nr_caps > 0);
1034         BUG_ON(!list_empty(&session->s_cap_flushing));
1035         cleanup_cap_releases(session);
1036 }
1037
1038 /*
1039  * wake up any threads waiting on this session's caps.  if the cap is
1040  * old (didn't get renewed on the client reconnect), remove it now.
1041  *
1042  * caller must hold s_mutex.
1043  */
1044 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1045                               void *arg)
1046 {
1047         struct ceph_inode_info *ci = ceph_inode(inode);
1048
1049         wake_up_all(&ci->i_cap_wq);
1050         if (arg) {
1051                 spin_lock(&ci->i_ceph_lock);
1052                 ci->i_wanted_max_size = 0;
1053                 ci->i_requested_max_size = 0;
1054                 spin_unlock(&ci->i_ceph_lock);
1055         }
1056         return 0;
1057 }
1058
1059 static void wake_up_session_caps(struct ceph_mds_session *session,
1060                                  int reconnect)
1061 {
1062         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1063         iterate_session_caps(session, wake_up_session_cb,
1064                              (void *)(unsigned long)reconnect);
1065 }
1066
1067 /*
1068  * Send periodic message to MDS renewing all currently held caps.  The
1069  * ack will reset the expiration for all caps from this session.
1070  *
1071  * caller holds s_mutex
1072  */
1073 static int send_renew_caps(struct ceph_mds_client *mdsc,
1074                            struct ceph_mds_session *session)
1075 {
1076         struct ceph_msg *msg;
1077         int state;
1078
1079         if (time_after_eq(jiffies, session->s_cap_ttl) &&
1080             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1081                 pr_info("mds%d caps stale\n", session->s_mds);
1082         session->s_renew_requested = jiffies;
1083
1084         /* do not try to renew caps until a recovering mds has reconnected
1085          * with its clients. */
1086         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1087         if (state < CEPH_MDS_STATE_RECONNECT) {
1088                 dout("send_renew_caps ignoring mds%d (%s)\n",
1089                      session->s_mds, ceph_mds_state_name(state));
1090                 return 0;
1091         }
1092
1093         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1094                 ceph_mds_state_name(state));
1095         msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1096                                  ++session->s_renew_seq);
1097         if (!msg)
1098                 return -ENOMEM;
1099         ceph_con_send(&session->s_con, msg);
1100         return 0;
1101 }
1102
1103 /*
1104  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1105  *
1106  * Called under session->s_mutex
1107  */
1108 static void renewed_caps(struct ceph_mds_client *mdsc,
1109                          struct ceph_mds_session *session, int is_renew)
1110 {
1111         int was_stale;
1112         int wake = 0;
1113
1114         spin_lock(&session->s_cap_lock);
1115         was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1116
1117         session->s_cap_ttl = session->s_renew_requested +
1118                 mdsc->mdsmap->m_session_timeout*HZ;
1119
1120         if (was_stale) {
1121                 if (time_before(jiffies, session->s_cap_ttl)) {
1122                         pr_info("mds%d caps renewed\n", session->s_mds);
1123                         wake = 1;
1124                 } else {
1125                         pr_info("mds%d caps still stale\n", session->s_mds);
1126                 }
1127         }
1128         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1129              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1130              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1131         spin_unlock(&session->s_cap_lock);
1132
1133         if (wake)
1134                 wake_up_session_caps(session, 0);
1135 }
1136
1137 /*
1138  * send a session close request
1139  */
1140 static int request_close_session(struct ceph_mds_client *mdsc,
1141                                  struct ceph_mds_session *session)
1142 {
1143         struct ceph_msg *msg;
1144
1145         dout("request_close_session mds%d state %s seq %lld\n",
1146              session->s_mds, session_state_name(session->s_state),
1147              session->s_seq);
1148         msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1149         if (!msg)
1150                 return -ENOMEM;
1151         ceph_con_send(&session->s_con, msg);
1152         return 0;
1153 }
1154
1155 /*
1156  * Called with s_mutex held.
1157  */
1158 static int __close_session(struct ceph_mds_client *mdsc,
1159                          struct ceph_mds_session *session)
1160 {
1161         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1162                 return 0;
1163         session->s_state = CEPH_MDS_SESSION_CLOSING;
1164         return request_close_session(mdsc, session);
1165 }
1166
1167 /*
1168  * Trim old(er) caps.
1169  *
1170  * Because we can't cache an inode without one or more caps, we do
1171  * this indirectly: if a cap is unused, we prune its aliases, at which
1172  * point the inode will hopefully get dropped to.
1173  *
1174  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1175  * memory pressure from the MDS, though, so it needn't be perfect.
1176  */
1177 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1178 {
1179         struct ceph_mds_session *session = arg;
1180         struct ceph_inode_info *ci = ceph_inode(inode);
1181         int used, oissued, mine;
1182
1183         if (session->s_trim_caps <= 0)
1184                 return -1;
1185
1186         spin_lock(&ci->i_ceph_lock);
1187         mine = cap->issued | cap->implemented;
1188         used = __ceph_caps_used(ci);
1189         oissued = __ceph_caps_issued_other(ci, cap);
1190
1191         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
1192              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1193              ceph_cap_string(used));
1194         if (ci->i_dirty_caps)
1195                 goto out;   /* dirty caps */
1196         if ((used & ~oissued) & mine)
1197                 goto out;   /* we need these caps */
1198
1199         session->s_trim_caps--;
1200         if (oissued) {
1201                 /* we aren't the only cap.. just remove us */
1202                 __queue_cap_release(session, ceph_ino(inode), cap->cap_id,
1203                                     cap->mseq, cap->issue_seq);
1204                 __ceph_remove_cap(cap);
1205         } else {
1206                 /* try to drop referring dentries */
1207                 spin_unlock(&ci->i_ceph_lock);
1208                 d_prune_aliases(inode);
1209                 dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1210                      inode, cap, atomic_read(&inode->i_count));
1211                 return 0;
1212         }
1213
1214 out:
1215         spin_unlock(&ci->i_ceph_lock);
1216         return 0;
1217 }
1218
1219 /*
1220  * Trim session cap count down to some max number.
1221  */
1222 static int trim_caps(struct ceph_mds_client *mdsc,
1223                      struct ceph_mds_session *session,
1224                      int max_caps)
1225 {
1226         int trim_caps = session->s_nr_caps - max_caps;
1227
1228         dout("trim_caps mds%d start: %d / %d, trim %d\n",
1229              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1230         if (trim_caps > 0) {
1231                 session->s_trim_caps = trim_caps;
1232                 iterate_session_caps(session, trim_caps_cb, session);
1233                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1234                      session->s_mds, session->s_nr_caps, max_caps,
1235                         trim_caps - session->s_trim_caps);
1236                 session->s_trim_caps = 0;
1237         }
1238         return 0;
1239 }
1240
1241 /*
1242  * Allocate cap_release messages.  If there is a partially full message
1243  * in the queue, try to allocate enough to cover it's remainder, so that
1244  * we can send it immediately.
1245  *
1246  * Called under s_mutex.
1247  */
1248 int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1249                           struct ceph_mds_session *session)
1250 {
1251         struct ceph_msg *msg, *partial = NULL;
1252         struct ceph_mds_cap_release *head;
1253         int err = -ENOMEM;
1254         int extra = mdsc->fsc->mount_options->cap_release_safety;
1255         int num;
1256
1257         dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1258              extra);
1259
1260         spin_lock(&session->s_cap_lock);
1261
1262         if (!list_empty(&session->s_cap_releases)) {
1263                 msg = list_first_entry(&session->s_cap_releases,
1264                                        struct ceph_msg,
1265                                  list_head);
1266                 head = msg->front.iov_base;
1267                 num = le32_to_cpu(head->num);
1268                 if (num) {
1269                         dout(" partial %p with (%d/%d)\n", msg, num,
1270                              (int)CEPH_CAPS_PER_RELEASE);
1271                         extra += CEPH_CAPS_PER_RELEASE - num;
1272                         partial = msg;
1273                 }
1274         }
1275         while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1276                 spin_unlock(&session->s_cap_lock);
1277                 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1278                                    GFP_NOFS, false);
1279                 if (!msg)
1280                         goto out_unlocked;
1281                 dout("add_cap_releases %p msg %p now %d\n", session, msg,
1282                      (int)msg->front.iov_len);
1283                 head = msg->front.iov_base;
1284                 head->num = cpu_to_le32(0);
1285                 msg->front.iov_len = sizeof(*head);
1286                 spin_lock(&session->s_cap_lock);
1287                 list_add(&msg->list_head, &session->s_cap_releases);
1288                 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1289         }
1290
1291         if (partial) {
1292                 head = partial->front.iov_base;
1293                 num = le32_to_cpu(head->num);
1294                 dout(" queueing partial %p with %d/%d\n", partial, num,
1295                      (int)CEPH_CAPS_PER_RELEASE);
1296                 list_move_tail(&partial->list_head,
1297                                &session->s_cap_releases_done);
1298                 session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
1299         }
1300         err = 0;
1301         spin_unlock(&session->s_cap_lock);
1302 out_unlocked:
1303         return err;
1304 }
1305
1306 /*
1307  * flush all dirty inode data to disk.
1308  *
1309  * returns true if we've flushed through want_flush_seq
1310  */
1311 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1312 {
1313         int mds, ret = 1;
1314
1315         dout("check_cap_flush want %lld\n", want_flush_seq);
1316         mutex_lock(&mdsc->mutex);
1317         for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1318                 struct ceph_mds_session *session = mdsc->sessions[mds];
1319
1320                 if (!session)
1321                         continue;
1322                 get_session(session);
1323                 mutex_unlock(&mdsc->mutex);
1324
1325                 mutex_lock(&session->s_mutex);
1326                 if (!list_empty(&session->s_cap_flushing)) {
1327                         struct ceph_inode_info *ci =
1328                                 list_entry(session->s_cap_flushing.next,
1329                                            struct ceph_inode_info,
1330                                            i_flushing_item);
1331                         struct inode *inode = &ci->vfs_inode;
1332
1333                         spin_lock(&ci->i_ceph_lock);
1334                         if (ci->i_cap_flush_seq <= want_flush_seq) {
1335                                 dout("check_cap_flush still flushing %p "
1336                                      "seq %lld <= %lld to mds%d\n", inode,
1337                                      ci->i_cap_flush_seq, want_flush_seq,
1338                                      session->s_mds);
1339                                 ret = 0;
1340                         }
1341                         spin_unlock(&ci->i_ceph_lock);
1342                 }
1343                 mutex_unlock(&session->s_mutex);
1344                 ceph_put_mds_session(session);
1345
1346                 if (!ret)
1347                         return ret;
1348                 mutex_lock(&mdsc->mutex);
1349         }
1350
1351         mutex_unlock(&mdsc->mutex);
1352         dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1353         return ret;
1354 }
1355
1356 /*
1357  * called under s_mutex
1358  */
1359 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1360                             struct ceph_mds_session *session)
1361 {
1362         struct ceph_msg *msg;
1363
1364         dout("send_cap_releases mds%d\n", session->s_mds);
1365         spin_lock(&session->s_cap_lock);
1366         while (!list_empty(&session->s_cap_releases_done)) {
1367                 msg = list_first_entry(&session->s_cap_releases_done,
1368                                  struct ceph_msg, list_head);
1369                 list_del_init(&msg->list_head);
1370                 spin_unlock(&session->s_cap_lock);
1371                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1372                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1373                 ceph_con_send(&session->s_con, msg);
1374                 spin_lock(&session->s_cap_lock);
1375         }
1376         spin_unlock(&session->s_cap_lock);
1377 }
1378
1379 static void discard_cap_releases(struct ceph_mds_client *mdsc,
1380                                  struct ceph_mds_session *session)
1381 {
1382         struct ceph_msg *msg;
1383         struct ceph_mds_cap_release *head;
1384         unsigned num;
1385
1386         dout("discard_cap_releases mds%d\n", session->s_mds);
1387         spin_lock(&session->s_cap_lock);
1388
1389         /* zero out the in-progress message */
1390         msg = list_first_entry(&session->s_cap_releases,
1391                                struct ceph_msg, list_head);
1392         head = msg->front.iov_base;
1393         num = le32_to_cpu(head->num);
1394         dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1395         head->num = cpu_to_le32(0);
1396         session->s_num_cap_releases += num;
1397
1398         /* requeue completed messages */
1399         while (!list_empty(&session->s_cap_releases_done)) {
1400                 msg = list_first_entry(&session->s_cap_releases_done,
1401                                  struct ceph_msg, list_head);
1402                 list_del_init(&msg->list_head);
1403
1404                 head = msg->front.iov_base;
1405                 num = le32_to_cpu(head->num);
1406                 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1407                      num);
1408                 session->s_num_cap_releases += num;
1409                 head->num = cpu_to_le32(0);
1410                 msg->front.iov_len = sizeof(*head);
1411                 list_add(&msg->list_head, &session->s_cap_releases);
1412         }
1413
1414         spin_unlock(&session->s_cap_lock);
1415 }
1416
1417 /*
1418  * requests
1419  */
1420
1421 /*
1422  * Create an mds request.
1423  */
1424 struct ceph_mds_request *
1425 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1426 {
1427         struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1428
1429         if (!req)
1430                 return ERR_PTR(-ENOMEM);
1431
1432         mutex_init(&req->r_fill_mutex);
1433         req->r_mdsc = mdsc;
1434         req->r_started = jiffies;
1435         req->r_resend_mds = -1;
1436         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1437         req->r_fmode = -1;
1438         kref_init(&req->r_kref);
1439         INIT_LIST_HEAD(&req->r_wait);
1440         init_completion(&req->r_completion);
1441         init_completion(&req->r_safe_completion);
1442         INIT_LIST_HEAD(&req->r_unsafe_item);
1443
1444         req->r_op = op;
1445         req->r_direct_mode = mode;
1446         return req;
1447 }
1448
1449 /*
1450  * return oldest (lowest) request, tid in request tree, 0 if none.
1451  *
1452  * called under mdsc->mutex.
1453  */
1454 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1455 {
1456         if (RB_EMPTY_ROOT(&mdsc->request_tree))
1457                 return NULL;
1458         return rb_entry(rb_first(&mdsc->request_tree),
1459                         struct ceph_mds_request, r_node);
1460 }
1461
1462 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1463 {
1464         struct ceph_mds_request *req = __get_oldest_req(mdsc);
1465
1466         if (req)
1467                 return req->r_tid;
1468         return 0;
1469 }
1470
1471 /*
1472  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1473  * on build_path_from_dentry in fs/cifs/dir.c.
1474  *
1475  * If @stop_on_nosnap, generate path relative to the first non-snapped
1476  * inode.
1477  *
1478  * Encode hidden .snap dirs as a double /, i.e.
1479  *   foo/.snap/bar -> foo//bar
1480  */
1481 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1482                            int stop_on_nosnap)
1483 {
1484         struct dentry *temp;
1485         char *path;
1486         int len, pos;
1487         unsigned seq;
1488
1489         if (dentry == NULL)
1490                 return ERR_PTR(-EINVAL);
1491
1492 retry:
1493         len = 0;
1494         seq = read_seqbegin(&rename_lock);
1495         rcu_read_lock();
1496         for (temp = dentry; !IS_ROOT(temp);) {
1497                 struct inode *inode = temp->d_inode;
1498                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1499                         len++;  /* slash only */
1500                 else if (stop_on_nosnap && inode &&
1501                          ceph_snap(inode) == CEPH_NOSNAP)
1502                         break;
1503                 else
1504                         len += 1 + temp->d_name.len;
1505                 temp = temp->d_parent;
1506         }
1507         rcu_read_unlock();
1508         if (len)
1509                 len--;  /* no leading '/' */
1510
1511         path = kmalloc(len+1, GFP_NOFS);
1512         if (path == NULL)
1513                 return ERR_PTR(-ENOMEM);
1514         pos = len;
1515         path[pos] = 0;  /* trailing null */
1516         rcu_read_lock();
1517         for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1518                 struct inode *inode;
1519
1520                 spin_lock(&temp->d_lock);
1521                 inode = temp->d_inode;
1522                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1523                         dout("build_path path+%d: %p SNAPDIR\n",
1524                              pos, temp);
1525                 } else if (stop_on_nosnap && inode &&
1526                            ceph_snap(inode) == CEPH_NOSNAP) {
1527                         spin_unlock(&temp->d_lock);
1528                         break;
1529                 } else {
1530                         pos -= temp->d_name.len;
1531                         if (pos < 0) {
1532                                 spin_unlock(&temp->d_lock);
1533                                 break;
1534                         }
1535                         strncpy(path + pos, temp->d_name.name,
1536                                 temp->d_name.len);
1537                 }
1538                 spin_unlock(&temp->d_lock);
1539                 if (pos)
1540                         path[--pos] = '/';
1541                 temp = temp->d_parent;
1542         }
1543         rcu_read_unlock();
1544         if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1545                 pr_err("build_path did not end path lookup where "
1546                        "expected, namelen is %d, pos is %d\n", len, pos);
1547                 /* presumably this is only possible if racing with a
1548                    rename of one of the parent directories (we can not
1549                    lock the dentries above us to prevent this, but
1550                    retrying should be harmless) */
1551                 kfree(path);
1552                 goto retry;
1553         }
1554
1555         *base = ceph_ino(temp->d_inode);
1556         *plen = len;
1557         dout("build_path on %p %d built %llx '%.*s'\n",
1558              dentry, dentry->d_count, *base, len, path);
1559         return path;
1560 }
1561
1562 static int build_dentry_path(struct dentry *dentry,
1563                              const char **ppath, int *ppathlen, u64 *pino,
1564                              int *pfreepath)
1565 {
1566         char *path;
1567
1568         if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1569                 *pino = ceph_ino(dentry->d_parent->d_inode);
1570                 *ppath = dentry->d_name.name;
1571                 *ppathlen = dentry->d_name.len;
1572                 return 0;
1573         }
1574         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1575         if (IS_ERR(path))
1576                 return PTR_ERR(path);
1577         *ppath = path;
1578         *pfreepath = 1;
1579         return 0;
1580 }
1581
1582 static int build_inode_path(struct inode *inode,
1583                             const char **ppath, int *ppathlen, u64 *pino,
1584                             int *pfreepath)
1585 {
1586         struct dentry *dentry;
1587         char *path;
1588
1589         if (ceph_snap(inode) == CEPH_NOSNAP) {
1590                 *pino = ceph_ino(inode);
1591                 *ppathlen = 0;
1592                 return 0;
1593         }
1594         dentry = d_find_alias(inode);
1595         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1596         dput(dentry);
1597         if (IS_ERR(path))
1598                 return PTR_ERR(path);
1599         *ppath = path;
1600         *pfreepath = 1;
1601         return 0;
1602 }
1603
1604 /*
1605  * request arguments may be specified via an inode *, a dentry *, or
1606  * an explicit ino+path.
1607  */
1608 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1609                                   const char *rpath, u64 rino,
1610                                   const char **ppath, int *pathlen,
1611                                   u64 *ino, int *freepath)
1612 {
1613         int r = 0;
1614
1615         if (rinode) {
1616                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1617                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1618                      ceph_snap(rinode));
1619         } else if (rdentry) {
1620                 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1621                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1622                      *ppath);
1623         } else if (rpath || rino) {
1624                 *ino = rino;
1625                 *ppath = rpath;
1626                 *pathlen = rpath ? strlen(rpath) : 0;
1627                 dout(" path %.*s\n", *pathlen, rpath);
1628         }
1629
1630         return r;
1631 }
1632
1633 /*
1634  * called under mdsc->mutex
1635  */
1636 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1637                                                struct ceph_mds_request *req,
1638                                                int mds)
1639 {
1640         struct ceph_msg *msg;
1641         struct ceph_mds_request_head *head;
1642         const char *path1 = NULL;
1643         const char *path2 = NULL;
1644         u64 ino1 = 0, ino2 = 0;
1645         int pathlen1 = 0, pathlen2 = 0;
1646         int freepath1 = 0, freepath2 = 0;
1647         int len;
1648         u16 releases;
1649         void *p, *end;
1650         int ret;
1651
1652         ret = set_request_path_attr(req->r_inode, req->r_dentry,
1653                               req->r_path1, req->r_ino1.ino,
1654                               &path1, &pathlen1, &ino1, &freepath1);
1655         if (ret < 0) {
1656                 msg = ERR_PTR(ret);
1657                 goto out;
1658         }
1659
1660         ret = set_request_path_attr(NULL, req->r_old_dentry,
1661                               req->r_path2, req->r_ino2.ino,
1662                               &path2, &pathlen2, &ino2, &freepath2);
1663         if (ret < 0) {
1664                 msg = ERR_PTR(ret);
1665                 goto out_free1;
1666         }
1667
1668         len = sizeof(*head) +
1669                 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
1670
1671         /* calculate (max) length for cap releases */
1672         len += sizeof(struct ceph_mds_request_release) *
1673                 (!!req->r_inode_drop + !!req->r_dentry_drop +
1674                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1675         if (req->r_dentry_drop)
1676                 len += req->r_dentry->d_name.len;
1677         if (req->r_old_dentry_drop)
1678                 len += req->r_old_dentry->d_name.len;
1679
1680         msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1681         if (!msg) {
1682                 msg = ERR_PTR(-ENOMEM);
1683                 goto out_free2;
1684         }
1685
1686         msg->hdr.tid = cpu_to_le64(req->r_tid);
1687
1688         head = msg->front.iov_base;
1689         p = msg->front.iov_base + sizeof(*head);
1690         end = msg->front.iov_base + msg->front.iov_len;
1691
1692         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1693         head->op = cpu_to_le32(req->r_op);
1694         head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
1695         head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
1696         head->args = req->r_args;
1697
1698         ceph_encode_filepath(&p, end, ino1, path1);
1699         ceph_encode_filepath(&p, end, ino2, path2);
1700
1701         /* make note of release offset, in case we need to replay */
1702         req->r_request_release_offset = p - msg->front.iov_base;
1703
1704         /* cap releases */
1705         releases = 0;
1706         if (req->r_inode_drop)
1707                 releases += ceph_encode_inode_release(&p,
1708                       req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1709                       mds, req->r_inode_drop, req->r_inode_unless, 0);
1710         if (req->r_dentry_drop)
1711                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1712                        mds, req->r_dentry_drop, req->r_dentry_unless);
1713         if (req->r_old_dentry_drop)
1714                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1715                        mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1716         if (req->r_old_inode_drop)
1717                 releases += ceph_encode_inode_release(&p,
1718                       req->r_old_dentry->d_inode,
1719                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1720         head->num_releases = cpu_to_le16(releases);
1721
1722         BUG_ON(p > end);
1723         msg->front.iov_len = p - msg->front.iov_base;
1724         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1725
1726         if (req->r_data_len) {
1727                 /* outbound data set only by ceph_sync_setxattr() */
1728                 BUG_ON(!req->r_pages);
1729                 ceph_msg_data_add_pages(msg, req->r_pages, req->r_data_len, 0);
1730         }
1731
1732         msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1733         msg->hdr.data_off = cpu_to_le16(0);
1734
1735 out_free2:
1736         if (freepath2)
1737                 kfree((char *)path2);
1738 out_free1:
1739         if (freepath1)
1740                 kfree((char *)path1);
1741 out:
1742         return msg;
1743 }
1744
1745 /*
1746  * called under mdsc->mutex if error, under no mutex if
1747  * success.
1748  */
1749 static void complete_request(struct ceph_mds_client *mdsc,
1750                              struct ceph_mds_request *req)
1751 {
1752         if (req->r_callback)
1753                 req->r_callback(mdsc, req);
1754         else
1755                 complete_all(&req->r_completion);
1756 }
1757
1758 /*
1759  * called under mdsc->mutex
1760  */
1761 static int __prepare_send_request(struct ceph_mds_client *mdsc,
1762                                   struct ceph_mds_request *req,
1763                                   int mds)
1764 {
1765         struct ceph_mds_request_head *rhead;
1766         struct ceph_msg *msg;
1767         int flags = 0;
1768
1769         req->r_attempts++;
1770         if (req->r_inode) {
1771                 struct ceph_cap *cap =
1772                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1773
1774                 if (cap)
1775                         req->r_sent_on_mseq = cap->mseq;
1776                 else
1777                         req->r_sent_on_mseq = -1;
1778         }
1779         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1780              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1781
1782         if (req->r_got_unsafe) {
1783                 /*
1784                  * Replay.  Do not regenerate message (and rebuild
1785                  * paths, etc.); just use the original message.
1786                  * Rebuilding paths will break for renames because
1787                  * d_move mangles the src name.
1788                  */
1789                 msg = req->r_request;
1790                 rhead = msg->front.iov_base;
1791
1792                 flags = le32_to_cpu(rhead->flags);
1793                 flags |= CEPH_MDS_FLAG_REPLAY;
1794                 rhead->flags = cpu_to_le32(flags);
1795
1796                 if (req->r_target_inode)
1797                         rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1798
1799                 rhead->num_retry = req->r_attempts - 1;
1800
1801                 /* remove cap/dentry releases from message */
1802                 rhead->num_releases = 0;
1803                 msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
1804                 msg->front.iov_len = req->r_request_release_offset;
1805                 return 0;
1806         }
1807
1808         if (req->r_request) {
1809                 ceph_msg_put(req->r_request);
1810                 req->r_request = NULL;
1811         }
1812         msg = create_request_message(mdsc, req, mds);
1813         if (IS_ERR(msg)) {
1814                 req->r_err = PTR_ERR(msg);
1815                 complete_request(mdsc, req);
1816                 return PTR_ERR(msg);
1817         }
1818         req->r_request = msg;
1819
1820         rhead = msg->front.iov_base;
1821         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1822         if (req->r_got_unsafe)
1823                 flags |= CEPH_MDS_FLAG_REPLAY;
1824         if (req->r_locked_dir)
1825                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1826         rhead->flags = cpu_to_le32(flags);
1827         rhead->num_fwd = req->r_num_fwd;
1828         rhead->num_retry = req->r_attempts - 1;
1829         rhead->ino = 0;
1830
1831         dout(" r_locked_dir = %p\n", req->r_locked_dir);
1832         return 0;
1833 }
1834
1835 /*
1836  * send request, or put it on the appropriate wait list.
1837  */
1838 static int __do_request(struct ceph_mds_client *mdsc,
1839                         struct ceph_mds_request *req)
1840 {
1841         struct ceph_mds_session *session = NULL;
1842         int mds = -1;
1843         int err = -EAGAIN;
1844
1845         if (req->r_err || req->r_got_result) {
1846                 if (req->r_aborted)
1847                         __unregister_request(mdsc, req);
1848                 goto out;
1849         }
1850
1851         if (req->r_timeout &&
1852             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1853                 dout("do_request timed out\n");
1854                 err = -EIO;
1855                 goto finish;
1856         }
1857
1858         put_request_session(req);
1859
1860         mds = __choose_mds(mdsc, req);
1861         if (mds < 0 ||
1862             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1863                 dout("do_request no mds or not active, waiting for map\n");
1864                 list_add(&req->r_wait, &mdsc->waiting_for_map);
1865                 goto out;
1866         }
1867
1868         /* get, open session */
1869         session = __ceph_lookup_mds_session(mdsc, mds);
1870         if (!session) {
1871                 session = register_session(mdsc, mds);
1872                 if (IS_ERR(session)) {
1873                         err = PTR_ERR(session);
1874                         goto finish;
1875                 }
1876         }
1877         req->r_session = get_session(session);
1878
1879         dout("do_request mds%d session %p state %s\n", mds, session,
1880              session_state_name(session->s_state));
1881         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1882             session->s_state != CEPH_MDS_SESSION_HUNG) {
1883                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1884                     session->s_state == CEPH_MDS_SESSION_CLOSING)
1885                         __open_session(mdsc, session);
1886                 list_add(&req->r_wait, &session->s_waiting);
1887                 goto out_session;
1888         }
1889
1890         /* send request */
1891         req->r_resend_mds = -1;   /* forget any previous mds hint */
1892
1893         if (req->r_request_started == 0)   /* note request start time */
1894                 req->r_request_started = jiffies;
1895
1896         err = __prepare_send_request(mdsc, req, mds);
1897         if (!err) {
1898                 ceph_msg_get(req->r_request);
1899                 ceph_con_send(&session->s_con, req->r_request);
1900         }
1901
1902 out_session:
1903         ceph_put_mds_session(session);
1904 out:
1905         return err;
1906
1907 finish:
1908         req->r_err = err;
1909         complete_request(mdsc, req);
1910         goto out;
1911 }
1912
1913 /*
1914  * called under mdsc->mutex
1915  */
1916 static void __wake_requests(struct ceph_mds_client *mdsc,
1917                             struct list_head *head)
1918 {
1919         struct ceph_mds_request *req;
1920         LIST_HEAD(tmp_list);
1921
1922         list_splice_init(head, &tmp_list);
1923
1924         while (!list_empty(&tmp_list)) {
1925                 req = list_entry(tmp_list.next,
1926                                  struct ceph_mds_request, r_wait);
1927                 list_del_init(&req->r_wait);
1928                 dout(" wake request %p tid %llu\n", req, req->r_tid);
1929                 __do_request(mdsc, req);
1930         }
1931 }
1932
1933 /*
1934  * Wake up threads with requests pending for @mds, so that they can
1935  * resubmit their requests to a possibly different mds.
1936  */
1937 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1938 {
1939         struct ceph_mds_request *req;
1940         struct rb_node *p;
1941
1942         dout("kick_requests mds%d\n", mds);
1943         for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
1944                 req = rb_entry(p, struct ceph_mds_request, r_node);
1945                 if (req->r_got_unsafe)
1946                         continue;
1947                 if (req->r_session &&
1948                     req->r_session->s_mds == mds) {
1949                         dout(" kicking tid %llu\n", req->r_tid);
1950                         __do_request(mdsc, req);
1951                 }
1952         }
1953 }
1954
1955 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1956                               struct ceph_mds_request *req)
1957 {
1958         dout("submit_request on %p\n", req);
1959         mutex_lock(&mdsc->mutex);
1960         __register_request(mdsc, req, NULL);
1961         __do_request(mdsc, req);
1962         mutex_unlock(&mdsc->mutex);
1963 }
1964
1965 /*
1966  * Synchrously perform an mds request.  Take care of all of the
1967  * session setup, forwarding, retry details.
1968  */
1969 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1970                          struct inode *dir,
1971                          struct ceph_mds_request *req)
1972 {
1973         int err;
1974
1975         dout("do_request on %p\n", req);
1976
1977         /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1978         if (req->r_inode)
1979                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1980         if (req->r_locked_dir)
1981                 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1982         if (req->r_old_dentry)
1983                 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
1984                                   CEPH_CAP_PIN);
1985
1986         /* issue */
1987         mutex_lock(&mdsc->mutex);
1988         __register_request(mdsc, req, dir);
1989         __do_request(mdsc, req);
1990
1991         if (req->r_err) {
1992                 err = req->r_err;
1993                 __unregister_request(mdsc, req);
1994                 dout("do_request early error %d\n", err);
1995                 goto out;
1996         }
1997
1998         /* wait */
1999         mutex_unlock(&mdsc->mutex);
2000         dout("do_request waiting\n");
2001         if (req->r_timeout) {
2002                 err = (long)wait_for_completion_killable_timeout(
2003                         &req->r_completion, req->r_timeout);
2004                 if (err == 0)
2005                         err = -EIO;
2006         } else {
2007                 err = wait_for_completion_killable(&req->r_completion);
2008         }
2009         dout("do_request waited, got %d\n", err);
2010         mutex_lock(&mdsc->mutex);
2011
2012         /* only abort if we didn't race with a real reply */
2013         if (req->r_got_result) {
2014                 err = le32_to_cpu(req->r_reply_info.head->result);
2015         } else if (err < 0) {
2016                 dout("aborted request %lld with %d\n", req->r_tid, err);
2017
2018                 /*
2019                  * ensure we aren't running concurrently with
2020                  * ceph_fill_trace or ceph_readdir_prepopulate, which
2021                  * rely on locks (dir mutex) held by our caller.
2022                  */
2023                 mutex_lock(&req->r_fill_mutex);
2024                 req->r_err = err;
2025                 req->r_aborted = true;
2026                 mutex_unlock(&req->r_fill_mutex);
2027
2028                 if (req->r_locked_dir &&
2029                     (req->r_op & CEPH_MDS_OP_WRITE))
2030                         ceph_invalidate_dir_request(req);
2031         } else {
2032                 err = req->r_err;
2033         }
2034
2035 out:
2036         mutex_unlock(&mdsc->mutex);
2037         dout("do_request %p done, result %d\n", req, err);
2038         return err;
2039 }
2040
2041 /*
2042  * Invalidate dir's completeness, dentry lease state on an aborted MDS
2043  * namespace request.
2044  */
2045 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2046 {
2047         struct inode *inode = req->r_locked_dir;
2048
2049         dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
2050
2051         ceph_dir_clear_complete(inode);
2052         if (req->r_dentry)
2053                 ceph_invalidate_dentry_lease(req->r_dentry);
2054         if (req->r_old_dentry)
2055                 ceph_invalidate_dentry_lease(req->r_old_dentry);
2056 }
2057
2058 /*
2059  * Handle mds reply.
2060  *
2061  * We take the session mutex and parse and process the reply immediately.
2062  * This preserves the logical ordering of replies, capabilities, etc., sent
2063  * by the MDS as they are applied to our local cache.
2064  */
2065 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2066 {
2067         struct ceph_mds_client *mdsc = session->s_mdsc;
2068         struct ceph_mds_request *req;
2069         struct ceph_mds_reply_head *head = msg->front.iov_base;
2070         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2071         u64 tid;
2072         int err, result;
2073         int mds = session->s_mds;
2074
2075         if (msg->front.iov_len < sizeof(*head)) {
2076                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2077                 ceph_msg_dump(msg);
2078                 return;
2079         }
2080
2081         /* get request, session */
2082         tid = le64_to_cpu(msg->hdr.tid);
2083         mutex_lock(&mdsc->mutex);
2084         req = __lookup_request(mdsc, tid);
2085         if (!req) {
2086                 dout("handle_reply on unknown tid %llu\n", tid);
2087                 mutex_unlock(&mdsc->mutex);
2088                 return;
2089         }
2090         dout("handle_reply %p\n", req);
2091
2092         /* correct session? */
2093         if (req->r_session != session) {
2094                 pr_err("mdsc_handle_reply got %llu on session mds%d"
2095                        " not mds%d\n", tid, session->s_mds,
2096                        req->r_session ? req->r_session->s_mds : -1);
2097                 mutex_unlock(&mdsc->mutex);
2098                 goto out;
2099         }
2100
2101         /* dup? */
2102         if ((req->r_got_unsafe && !head->safe) ||
2103             (req->r_got_safe && head->safe)) {
2104                 pr_warning("got a dup %s reply on %llu from mds%d\n",
2105                            head->safe ? "safe" : "unsafe", tid, mds);
2106                 mutex_unlock(&mdsc->mutex);
2107                 goto out;
2108         }
2109         if (req->r_got_safe && !head->safe) {
2110                 pr_warning("got unsafe after safe on %llu from mds%d\n",
2111                            tid, mds);
2112                 mutex_unlock(&mdsc->mutex);
2113                 goto out;
2114         }
2115
2116         result = le32_to_cpu(head->result);
2117
2118         /*
2119          * Handle an ESTALE
2120          * if we're not talking to the authority, send to them
2121          * if the authority has changed while we weren't looking,
2122          * send to new authority
2123          * Otherwise we just have to return an ESTALE
2124          */
2125         if (result == -ESTALE) {
2126                 dout("got ESTALE on request %llu", req->r_tid);
2127                 if (!req->r_inode) {
2128                         /* do nothing; not an authority problem */
2129                 } else if (req->r_direct_mode != USE_AUTH_MDS) {
2130                         dout("not using auth, setting for that now");
2131                         req->r_direct_mode = USE_AUTH_MDS;
2132                         __do_request(mdsc, req);
2133                         mutex_unlock(&mdsc->mutex);
2134                         goto out;
2135                 } else  {
2136                         struct ceph_inode_info *ci = ceph_inode(req->r_inode);
2137                         struct ceph_cap *cap = NULL;
2138
2139                         if (req->r_session)
2140                                 cap = ceph_get_cap_for_mds(ci,
2141                                                    req->r_session->s_mds);
2142
2143                         dout("already using auth");
2144                         if ((!cap || cap != ci->i_auth_cap) ||
2145                             (cap->mseq != req->r_sent_on_mseq)) {
2146                                 dout("but cap changed, so resending");
2147                                 __do_request(mdsc, req);
2148                                 mutex_unlock(&mdsc->mutex);
2149                                 goto out;
2150                         }
2151                 }
2152                 dout("have to return ESTALE on request %llu", req->r_tid);
2153         }
2154
2155
2156         if (head->safe) {
2157                 req->r_got_safe = true;
2158                 __unregister_request(mdsc, req);
2159
2160                 if (req->r_got_unsafe) {
2161                         /*
2162                          * We already handled the unsafe response, now do the
2163                          * cleanup.  No need to examine the response; the MDS
2164                          * doesn't include any result info in the safe
2165                          * response.  And even if it did, there is nothing
2166                          * useful we could do with a revised return value.
2167                          */
2168                         dout("got safe reply %llu, mds%d\n", tid, mds);
2169                         list_del_init(&req->r_unsafe_item);
2170
2171                         /* last unsafe request during umount? */
2172                         if (mdsc->stopping && !__get_oldest_req(mdsc))
2173                                 complete_all(&mdsc->safe_umount_waiters);
2174                         mutex_unlock(&mdsc->mutex);
2175                         goto out;
2176                 }
2177         } else {
2178                 req->r_got_unsafe = true;
2179                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2180         }
2181
2182         dout("handle_reply tid %lld result %d\n", tid, result);
2183         rinfo = &req->r_reply_info;
2184         err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2185         mutex_unlock(&mdsc->mutex);
2186
2187         mutex_lock(&session->s_mutex);
2188         if (err < 0) {
2189                 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2190                 ceph_msg_dump(msg);
2191                 goto out_err;
2192         }
2193
2194         /* snap trace */
2195         if (rinfo->snapblob_len) {
2196                 down_write(&mdsc->snap_rwsem);
2197                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2198                                rinfo->snapblob + rinfo->snapblob_len,
2199                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
2200                 downgrade_write(&mdsc->snap_rwsem);
2201         } else {
2202                 down_read(&mdsc->snap_rwsem);
2203         }
2204
2205         /* insert trace into our cache */
2206         mutex_lock(&req->r_fill_mutex);
2207         err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2208         if (err == 0) {
2209                 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2210                                     req->r_op == CEPH_MDS_OP_LSSNAP) &&
2211                     rinfo->dir_nr)
2212                         ceph_readdir_prepopulate(req, req->r_session);
2213                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2214         }
2215         mutex_unlock(&req->r_fill_mutex);
2216
2217         up_read(&mdsc->snap_rwsem);
2218 out_err:
2219         mutex_lock(&mdsc->mutex);
2220         if (!req->r_aborted) {
2221                 if (err) {
2222                         req->r_err = err;
2223                 } else {
2224                         req->r_reply = msg;
2225                         ceph_msg_get(msg);
2226                         req->r_got_result = true;
2227                 }
2228         } else {
2229                 dout("reply arrived after request %lld was aborted\n", tid);
2230         }
2231         mutex_unlock(&mdsc->mutex);
2232
2233         ceph_add_cap_releases(mdsc, req->r_session);
2234         mutex_unlock(&session->s_mutex);
2235
2236         /* kick calling process */
2237         complete_request(mdsc, req);
2238 out:
2239         ceph_mdsc_put_request(req);
2240         return;
2241 }
2242
2243
2244
2245 /*
2246  * handle mds notification that our request has been forwarded.
2247  */
2248 static void handle_forward(struct ceph_mds_client *mdsc,
2249                            struct ceph_mds_session *session,
2250                            struct ceph_msg *msg)
2251 {
2252         struct ceph_mds_request *req;
2253         u64 tid = le64_to_cpu(msg->hdr.tid);
2254         u32 next_mds;
2255         u32 fwd_seq;
2256         int err = -EINVAL;
2257         void *p = msg->front.iov_base;
2258         void *end = p + msg->front.iov_len;
2259
2260         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2261         next_mds = ceph_decode_32(&p);
2262         fwd_seq = ceph_decode_32(&p);
2263
2264         mutex_lock(&mdsc->mutex);
2265         req = __lookup_request(mdsc, tid);
2266         if (!req) {
2267                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2268                 goto out;  /* dup reply? */
2269         }
2270
2271         if (req->r_aborted) {
2272                 dout("forward tid %llu aborted, unregistering\n", tid);
2273                 __unregister_request(mdsc, req);
2274         } else if (fwd_seq <= req->r_num_fwd) {
2275                 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2276                      tid, next_mds, req->r_num_fwd, fwd_seq);
2277         } else {
2278                 /* resend. forward race not possible; mds would drop */
2279                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2280                 BUG_ON(req->r_err);
2281                 BUG_ON(req->r_got_result);
2282                 req->r_num_fwd = fwd_seq;
2283                 req->r_resend_mds = next_mds;
2284                 put_request_session(req);
2285                 __do_request(mdsc, req);
2286         }
2287         ceph_mdsc_put_request(req);
2288 out:
2289         mutex_unlock(&mdsc->mutex);
2290         return;
2291
2292 bad:
2293         pr_err("mdsc_handle_forward decode error err=%d\n", err);
2294 }
2295
2296 /*
2297  * handle a mds session control message
2298  */
2299 static void handle_session(struct ceph_mds_session *session,
2300                            struct ceph_msg *msg)
2301 {
2302         struct ceph_mds_client *mdsc = session->s_mdsc;
2303         u32 op;
2304         u64 seq;
2305         int mds = session->s_mds;
2306         struct ceph_mds_session_head *h = msg->front.iov_base;
2307         int wake = 0;
2308
2309         /* decode */
2310         if (msg->front.iov_len != sizeof(*h))
2311                 goto bad;
2312         op = le32_to_cpu(h->op);
2313         seq = le64_to_cpu(h->seq);
2314
2315         mutex_lock(&mdsc->mutex);
2316         if (op == CEPH_SESSION_CLOSE)
2317                 __unregister_session(mdsc, session);
2318         /* FIXME: this ttl calculation is generous */
2319         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2320         mutex_unlock(&mdsc->mutex);
2321
2322         mutex_lock(&session->s_mutex);
2323
2324         dout("handle_session mds%d %s %p state %s seq %llu\n",
2325              mds, ceph_session_op_name(op), session,
2326              session_state_name(session->s_state), seq);
2327
2328         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2329                 session->s_state = CEPH_MDS_SESSION_OPEN;
2330                 pr_info("mds%d came back\n", session->s_mds);
2331         }
2332
2333         switch (op) {
2334         case CEPH_SESSION_OPEN:
2335                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2336                         pr_info("mds%d reconnect success\n", session->s_mds);
2337                 session->s_state = CEPH_MDS_SESSION_OPEN;
2338                 renewed_caps(mdsc, session, 0);
2339                 wake = 1;
2340                 if (mdsc->stopping)
2341                         __close_session(mdsc, session);
2342                 break;
2343
2344         case CEPH_SESSION_RENEWCAPS:
2345                 if (session->s_renew_seq == seq)
2346                         renewed_caps(mdsc, session, 1);
2347                 break;
2348
2349         case CEPH_SESSION_CLOSE:
2350                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2351                         pr_info("mds%d reconnect denied\n", session->s_mds);
2352                 remove_session_caps(session);
2353                 wake = 1; /* for good measure */
2354                 wake_up_all(&mdsc->session_close_wq);
2355                 kick_requests(mdsc, mds);
2356                 break;
2357
2358         case CEPH_SESSION_STALE:
2359                 pr_info("mds%d caps went stale, renewing\n",
2360                         session->s_mds);
2361                 spin_lock(&session->s_gen_ttl_lock);
2362                 session->s_cap_gen++;
2363                 session->s_cap_ttl = jiffies - 1;
2364                 spin_unlock(&session->s_gen_ttl_lock);
2365                 send_renew_caps(mdsc, session);
2366                 break;
2367
2368         case CEPH_SESSION_RECALL_STATE:
2369                 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2370                 break;
2371
2372         default:
2373                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2374                 WARN_ON(1);
2375         }
2376
2377         mutex_unlock(&session->s_mutex);
2378         if (wake) {
2379                 mutex_lock(&mdsc->mutex);
2380                 __wake_requests(mdsc, &session->s_waiting);
2381                 mutex_unlock(&mdsc->mutex);
2382         }
2383         return;
2384
2385 bad:
2386         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2387                (int)msg->front.iov_len);
2388         ceph_msg_dump(msg);
2389         return;
2390 }
2391
2392
2393 /*
2394  * called under session->mutex.
2395  */
2396 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2397                                    struct ceph_mds_session *session)
2398 {
2399         struct ceph_mds_request *req, *nreq;
2400         int err;
2401
2402         dout("replay_unsafe_requests mds%d\n", session->s_mds);
2403
2404         mutex_lock(&mdsc->mutex);
2405         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2406                 err = __prepare_send_request(mdsc, req, session->s_mds);
2407                 if (!err) {
2408                         ceph_msg_get(req->r_request);
2409                         ceph_con_send(&session->s_con, req->r_request);
2410                 }
2411         }
2412         mutex_unlock(&mdsc->mutex);
2413 }
2414
2415 /*
2416  * Encode information about a cap for a reconnect with the MDS.
2417  */
2418 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2419                           void *arg)
2420 {
2421         union {
2422                 struct ceph_mds_cap_reconnect v2;
2423                 struct ceph_mds_cap_reconnect_v1 v1;
2424         } rec;
2425         size_t reclen;
2426         struct ceph_inode_info *ci;
2427         struct ceph_reconnect_state *recon_state = arg;
2428         struct ceph_pagelist *pagelist = recon_state->pagelist;
2429         char *path;
2430         int pathlen, err;
2431         u64 pathbase;
2432         struct dentry *dentry;
2433
2434         ci = cap->ci;
2435
2436         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2437              inode, ceph_vinop(inode), cap, cap->cap_id,
2438              ceph_cap_string(cap->issued));
2439         err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2440         if (err)
2441                 return err;
2442
2443         dentry = d_find_alias(inode);
2444         if (dentry) {
2445                 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2446                 if (IS_ERR(path)) {
2447                         err = PTR_ERR(path);
2448                         goto out_dput;
2449                 }
2450         } else {
2451                 path = NULL;
2452                 pathlen = 0;
2453         }
2454         err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2455         if (err)
2456                 goto out_free;
2457
2458         spin_lock(&ci->i_ceph_lock);
2459         cap->seq = 0;        /* reset cap seq */
2460         cap->issue_seq = 0;  /* and issue_seq */
2461
2462         if (recon_state->flock) {
2463                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2464                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2465                 rec.v2.issued = cpu_to_le32(cap->issued);
2466                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2467                 rec.v2.pathbase = cpu_to_le64(pathbase);
2468                 rec.v2.flock_len = 0;
2469                 reclen = sizeof(rec.v2);
2470         } else {
2471                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2472                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2473                 rec.v1.issued = cpu_to_le32(cap->issued);
2474                 rec.v1.size = cpu_to_le64(inode->i_size);
2475                 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2476                 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2477                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2478                 rec.v1.pathbase = cpu_to_le64(pathbase);
2479                 reclen = sizeof(rec.v1);
2480         }
2481         spin_unlock(&ci->i_ceph_lock);
2482
2483         if (recon_state->flock) {
2484                 int num_fcntl_locks, num_flock_locks;
2485                 struct ceph_filelock *flocks;
2486
2487 encode_again:
2488                 lock_flocks();
2489                 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
2490                 unlock_flocks();
2491                 flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
2492                                  sizeof(struct ceph_filelock), GFP_NOFS);
2493                 if (!flocks) {
2494                         err = -ENOMEM;
2495                         goto out_free;
2496                 }
2497                 lock_flocks();
2498                 err = ceph_encode_locks_to_buffer(inode, flocks,
2499                                                   num_fcntl_locks,
2500                                                   num_flock_locks);
2501                 unlock_flocks();
2502                 if (err) {
2503                         kfree(flocks);
2504                         if (err == -ENOSPC)
2505                                 goto encode_again;
2506                         goto out_free;
2507                 }
2508                 /*
2509                  * number of encoded locks is stable, so copy to pagelist
2510                  */
2511                 rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) +
2512                                     (num_fcntl_locks+num_flock_locks) *
2513                                     sizeof(struct ceph_filelock));
2514                 err = ceph_pagelist_append(pagelist, &rec, reclen);
2515                 if (!err)
2516                         err = ceph_locks_to_pagelist(flocks, pagelist,
2517                                                      num_fcntl_locks,
2518                                                      num_flock_locks);
2519                 kfree(flocks);
2520         } else {
2521                 err = ceph_pagelist_append(pagelist, &rec, reclen);
2522         }
2523 out_free:
2524         kfree(path);
2525 out_dput:
2526         dput(dentry);
2527         return err;
2528 }
2529
2530
2531 /*
2532  * If an MDS fails and recovers, clients need to reconnect in order to
2533  * reestablish shared state.  This includes all caps issued through
2534  * this session _and_ the snap_realm hierarchy.  Because it's not
2535  * clear which snap realms the mds cares about, we send everything we
2536  * know about.. that ensures we'll then get any new info the
2537  * recovering MDS might have.
2538  *
2539  * This is a relatively heavyweight operation, but it's rare.
2540  *
2541  * called with mdsc->mutex held.
2542  */
2543 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2544                                struct ceph_mds_session *session)
2545 {
2546         struct ceph_msg *reply;
2547         struct rb_node *p;
2548         int mds = session->s_mds;
2549         int err = -ENOMEM;
2550         struct ceph_pagelist *pagelist;
2551         struct ceph_reconnect_state recon_state;
2552
2553         pr_info("mds%d reconnect start\n", mds);
2554
2555         pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2556         if (!pagelist)
2557                 goto fail_nopagelist;
2558         ceph_pagelist_init(pagelist);
2559
2560         reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
2561         if (!reply)
2562                 goto fail_nomsg;
2563
2564         mutex_lock(&session->s_mutex);
2565         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2566         session->s_seq = 0;
2567
2568         ceph_con_close(&session->s_con);
2569         ceph_con_open(&session->s_con,
2570                       CEPH_ENTITY_TYPE_MDS, mds,
2571                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2572
2573         /* replay unsafe requests */
2574         replay_unsafe_requests(mdsc, session);
2575
2576         down_read(&mdsc->snap_rwsem);
2577
2578         dout("session %p state %s\n", session,
2579              session_state_name(session->s_state));
2580
2581         /* drop old cap expires; we're about to reestablish that state */
2582         discard_cap_releases(mdsc, session);
2583
2584         /* traverse this session's caps */
2585         err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2586         if (err)
2587                 goto fail;
2588
2589         recon_state.pagelist = pagelist;
2590         recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2591         err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2592         if (err < 0)
2593                 goto fail;
2594
2595         /*
2596          * snaprealms.  we provide mds with the ino, seq (version), and
2597          * parent for all of our realms.  If the mds has any newer info,
2598          * it will tell us.
2599          */
2600         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2601                 struct ceph_snap_realm *realm =
2602                         rb_entry(p, struct ceph_snap_realm, node);
2603                 struct ceph_mds_snaprealm_reconnect sr_rec;
2604
2605                 dout(" adding snap realm %llx seq %lld parent %llx\n",
2606                      realm->ino, realm->seq, realm->parent_ino);
2607                 sr_rec.ino = cpu_to_le64(realm->ino);
2608                 sr_rec.seq = cpu_to_le64(realm->seq);
2609                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
2610                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2611                 if (err)
2612                         goto fail;
2613         }
2614
2615         if (recon_state.flock)
2616                 reply->hdr.version = cpu_to_le16(2);
2617         if (pagelist->length) {
2618                 /* set up outbound data if we have any */
2619                 reply->hdr.data_len = cpu_to_le32(pagelist->length);
2620                 ceph_msg_data_add_pagelist(reply, pagelist);
2621         }
2622         ceph_con_send(&session->s_con, reply);
2623
2624         mutex_unlock(&session->s_mutex);
2625
2626         mutex_lock(&mdsc->mutex);
2627         __wake_requests(mdsc, &session->s_waiting);
2628         mutex_unlock(&mdsc->mutex);
2629
2630         up_read(&mdsc->snap_rwsem);
2631         return;
2632
2633 fail:
2634         ceph_msg_put(reply);
2635         up_read(&mdsc->snap_rwsem);
2636         mutex_unlock(&session->s_mutex);
2637 fail_nomsg:
2638         ceph_pagelist_release(pagelist);
2639         kfree(pagelist);
2640 fail_nopagelist:
2641         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2642         return;
2643 }
2644
2645
2646 /*
2647  * compare old and new mdsmaps, kicking requests
2648  * and closing out old connections as necessary
2649  *
2650  * called under mdsc->mutex.
2651  */
2652 static void check_new_map(struct ceph_mds_client *mdsc,
2653                           struct ceph_mdsmap *newmap,
2654                           struct ceph_mdsmap *oldmap)
2655 {
2656         int i;
2657         int oldstate, newstate;
2658         struct ceph_mds_session *s;
2659
2660         dout("check_new_map new %u old %u\n",
2661              newmap->m_epoch, oldmap->m_epoch);
2662
2663         for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2664                 if (mdsc->sessions[i] == NULL)
2665                         continue;
2666                 s = mdsc->sessions[i];
2667                 oldstate = ceph_mdsmap_get_state(oldmap, i);
2668                 newstate = ceph_mdsmap_get_state(newmap, i);
2669
2670                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2671                      i, ceph_mds_state_name(oldstate),
2672                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2673                      ceph_mds_state_name(newstate),
2674                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2675                      session_state_name(s->s_state));
2676
2677                 if (i >= newmap->m_max_mds ||
2678                     memcmp(ceph_mdsmap_get_addr(oldmap, i),
2679                            ceph_mdsmap_get_addr(newmap, i),
2680                            sizeof(struct ceph_entity_addr))) {
2681                         if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2682                                 /* the session never opened, just close it
2683                                  * out now */
2684                                 __wake_requests(mdsc, &s->s_waiting);
2685                                 __unregister_session(mdsc, s);
2686                         } else {
2687                                 /* just close it */
2688                                 mutex_unlock(&mdsc->mutex);
2689                                 mutex_lock(&s->s_mutex);
2690                                 mutex_lock(&mdsc->mutex);
2691                                 ceph_con_close(&s->s_con);
2692                                 mutex_unlock(&s->s_mutex);
2693                                 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2694                         }
2695
2696                         /* kick any requests waiting on the recovering mds */
2697                         kick_requests(mdsc, i);
2698                 } else if (oldstate == newstate) {
2699                         continue;  /* nothing new with this mds */
2700                 }
2701
2702                 /*
2703                  * send reconnect?
2704                  */
2705                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2706                     newstate >= CEPH_MDS_STATE_RECONNECT) {
2707                         mutex_unlock(&mdsc->mutex);
2708                         send_mds_reconnect(mdsc, s);
2709                         mutex_lock(&mdsc->mutex);
2710                 }
2711
2712                 /*
2713                  * kick request on any mds that has gone active.
2714                  */
2715                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2716                     newstate >= CEPH_MDS_STATE_ACTIVE) {
2717                         if (oldstate != CEPH_MDS_STATE_CREATING &&
2718                             oldstate != CEPH_MDS_STATE_STARTING)
2719                                 pr_info("mds%d recovery completed\n", s->s_mds);
2720                         kick_requests(mdsc, i);
2721                         ceph_kick_flushing_caps(mdsc, s);
2722                         wake_up_session_caps(s, 1);
2723                 }
2724         }
2725
2726         for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2727                 s = mdsc->sessions[i];
2728                 if (!s)
2729                         continue;
2730                 if (!ceph_mdsmap_is_laggy(newmap, i))
2731                         continue;
2732                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2733                     s->s_state == CEPH_MDS_SESSION_HUNG ||
2734                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
2735                         dout(" connecting to export targets of laggy mds%d\n",
2736                              i);
2737                         __open_export_target_sessions(mdsc, s);
2738                 }
2739         }
2740 }
2741
2742
2743
2744 /*
2745  * leases
2746  */
2747
2748 /*
2749  * caller must hold session s_mutex, dentry->d_lock
2750  */
2751 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2752 {
2753         struct ceph_dentry_info *di = ceph_dentry(dentry);
2754
2755         ceph_put_mds_session(di->lease_session);
2756         di->lease_session = NULL;
2757 }
2758
2759 static void handle_lease(struct ceph_mds_client *mdsc,
2760                          struct ceph_mds_session *session,
2761                          struct ceph_msg *msg)
2762 {
2763         struct super_block *sb = mdsc->fsc->sb;
2764         struct inode *inode;
2765         struct dentry *parent, *dentry;
2766         struct ceph_dentry_info *di;
2767         int mds = session->s_mds;
2768         struct ceph_mds_lease *h = msg->front.iov_base;
2769         u32 seq;
2770         struct ceph_vino vino;
2771         struct qstr dname;
2772         int release = 0;
2773
2774         dout("handle_lease from mds%d\n", mds);
2775
2776         /* decode */
2777         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2778                 goto bad;
2779         vino.ino = le64_to_cpu(h->ino);
2780         vino.snap = CEPH_NOSNAP;
2781         seq = le32_to_cpu(h->seq);
2782         dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2783         dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2784         if (dname.len != get_unaligned_le32(h+1))
2785                 goto bad;
2786
2787         mutex_lock(&session->s_mutex);
2788         session->s_seq++;
2789
2790         /* lookup inode */
2791         inode = ceph_find_inode(sb, vino);
2792         dout("handle_lease %s, ino %llx %p %.*s\n",
2793              ceph_lease_op_name(h->action), vino.ino, inode,
2794              dname.len, dname.name);
2795         if (inode == NULL) {
2796                 dout("handle_lease no inode %llx\n", vino.ino);
2797                 goto release;
2798         }
2799
2800         /* dentry */
2801         parent = d_find_alias(inode);
2802         if (!parent) {
2803                 dout("no parent dentry on inode %p\n", inode);
2804                 WARN_ON(1);
2805                 goto release;  /* hrm... */
2806         }
2807         dname.hash = full_name_hash(dname.name, dname.len);
2808         dentry = d_lookup(parent, &dname);
2809         dput(parent);
2810         if (!dentry)
2811                 goto release;
2812
2813         spin_lock(&dentry->d_lock);
2814         di = ceph_dentry(dentry);
2815         switch (h->action) {
2816         case CEPH_MDS_LEASE_REVOKE:
2817                 if (di->lease_session == session) {
2818                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
2819                                 h->seq = cpu_to_le32(di->lease_seq);
2820                         __ceph_mdsc_drop_dentry_lease(dentry);
2821                 }
2822                 release = 1;
2823                 break;
2824
2825         case CEPH_MDS_LEASE_RENEW:
2826                 if (di->lease_session == session &&
2827                     di->lease_gen == session->s_cap_gen &&
2828                     di->lease_renew_from &&
2829                     di->lease_renew_after == 0) {
2830                         unsigned long duration =
2831                                 le32_to_cpu(h->duration_ms) * HZ / 1000;
2832
2833                         di->lease_seq = seq;
2834                         dentry->d_time = di->lease_renew_from + duration;
2835                         di->lease_renew_after = di->lease_renew_from +
2836                                 (duration >> 1);
2837                         di->lease_renew_from = 0;
2838                 }
2839                 break;
2840         }
2841         spin_unlock(&dentry->d_lock);
2842         dput(dentry);
2843
2844         if (!release)
2845                 goto out;
2846
2847 release:
2848         /* let's just reuse the same message */
2849         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2850         ceph_msg_get(msg);
2851         ceph_con_send(&session->s_con, msg);
2852
2853 out:
2854         iput(inode);
2855         mutex_unlock(&session->s_mutex);
2856         return;
2857
2858 bad:
2859         pr_err("corrupt lease message\n");
2860         ceph_msg_dump(msg);
2861 }
2862
2863 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2864                               struct inode *inode,
2865                               struct dentry *dentry, char action,
2866                               u32 seq)
2867 {
2868         struct ceph_msg *msg;
2869         struct ceph_mds_lease *lease;
2870         int len = sizeof(*lease) + sizeof(u32);
2871         int dnamelen = 0;
2872
2873         dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2874              inode, dentry, ceph_lease_op_name(action), session->s_mds);
2875         dnamelen = dentry->d_name.len;
2876         len += dnamelen;
2877
2878         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
2879         if (!msg)
2880                 return;
2881         lease = msg->front.iov_base;
2882         lease->action = action;
2883         lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2884         lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2885         lease->seq = cpu_to_le32(seq);
2886         put_unaligned_le32(dnamelen, lease + 1);
2887         memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2888
2889         /*
2890          * if this is a preemptive lease RELEASE, no need to
2891          * flush request stream, since the actual request will
2892          * soon follow.
2893          */
2894         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2895
2896         ceph_con_send(&session->s_con, msg);
2897 }
2898
2899 /*
2900  * Preemptively release a lease we expect to invalidate anyway.
2901  * Pass @inode always, @dentry is optional.
2902  */
2903 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2904                              struct dentry *dentry)
2905 {
2906         struct ceph_dentry_info *di;
2907         struct ceph_mds_session *session;
2908         u32 seq;
2909
2910         BUG_ON(inode == NULL);
2911         BUG_ON(dentry == NULL);
2912
2913         /* is dentry lease valid? */
2914         spin_lock(&dentry->d_lock);
2915         di = ceph_dentry(dentry);
2916         if (!di || !di->lease_session ||
2917             di->lease_session->s_mds < 0 ||
2918             di->lease_gen != di->lease_session->s_cap_gen ||
2919             !time_before(jiffies, dentry->d_time)) {
2920                 dout("lease_release inode %p dentry %p -- "
2921                      "no lease\n",
2922                      inode, dentry);
2923                 spin_unlock(&dentry->d_lock);
2924                 return;
2925         }
2926
2927         /* we do have a lease on this dentry; note mds and seq */
2928         session = ceph_get_mds_session(di->lease_session);
2929         seq = di->lease_seq;
2930         __ceph_mdsc_drop_dentry_lease(dentry);
2931         spin_unlock(&dentry->d_lock);
2932
2933         dout("lease_release inode %p dentry %p to mds%d\n",
2934              inode, dentry, session->s_mds);
2935         ceph_mdsc_lease_send_msg(session, inode, dentry,
2936                                  CEPH_MDS_LEASE_RELEASE, seq);
2937         ceph_put_mds_session(session);
2938 }
2939
2940 /*
2941  * drop all leases (and dentry refs) in preparation for umount
2942  */
2943 static void drop_leases(struct ceph_mds_client *mdsc)
2944 {
2945         int i;
2946
2947         dout("drop_leases\n");
2948         mutex_lock(&mdsc->mutex);
2949         for (i = 0; i < mdsc->max_sessions; i++) {
2950                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2951                 if (!s)
2952                         continue;
2953                 mutex_unlock(&mdsc->mutex);
2954                 mutex_lock(&s->s_mutex);
2955                 mutex_unlock(&s->s_mutex);
2956                 ceph_put_mds_session(s);
2957                 mutex_lock(&mdsc->mutex);
2958         }
2959         mutex_unlock(&mdsc->mutex);
2960 }
2961
2962
2963
2964 /*
2965  * delayed work -- periodically trim expired leases, renew caps with mds
2966  */
2967 static void schedule_delayed(struct ceph_mds_client *mdsc)
2968 {
2969         int delay = 5;
2970         unsigned hz = round_jiffies_relative(HZ * delay);
2971         schedule_delayed_work(&mdsc->delayed_work, hz);
2972 }
2973
2974 static void delayed_work(struct work_struct *work)
2975 {
2976         int i;
2977         struct ceph_mds_client *mdsc =
2978                 container_of(work, struct ceph_mds_client, delayed_work.work);
2979         int renew_interval;
2980         int renew_caps;
2981
2982         dout("mdsc delayed_work\n");
2983         ceph_check_delayed_caps(mdsc);
2984
2985         mutex_lock(&mdsc->mutex);
2986         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2987         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2988                                    mdsc->last_renew_caps);
2989         if (renew_caps)
2990                 mdsc->last_renew_caps = jiffies;
2991
2992         for (i = 0; i < mdsc->max_sessions; i++) {
2993                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2994                 if (s == NULL)
2995                         continue;
2996                 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2997                         dout("resending session close request for mds%d\n",
2998                              s->s_mds);
2999                         request_close_session(mdsc, s);
3000                         ceph_put_mds_session(s);
3001                         continue;
3002                 }
3003                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
3004                         if (s->s_state == CEPH_MDS_SESSION_OPEN) {
3005                                 s->s_state = CEPH_MDS_SESSION_HUNG;
3006                                 pr_info("mds%d hung\n", s->s_mds);
3007                         }
3008                 }
3009                 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
3010                         /* this mds is failed or recovering, just wait */
3011                         ceph_put_mds_session(s);
3012                         continue;
3013                 }
3014                 mutex_unlock(&mdsc->mutex);
3015
3016                 mutex_lock(&s->s_mutex);
3017                 if (renew_caps)
3018                         send_renew_caps(mdsc, s);
3019                 else
3020                         ceph_con_keepalive(&s->s_con);
3021                 ceph_add_cap_releases(mdsc, s);
3022                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3023                     s->s_state == CEPH_MDS_SESSION_HUNG)
3024                         ceph_send_cap_releases(mdsc, s);
3025                 mutex_unlock(&s->s_mutex);
3026                 ceph_put_mds_session(s);
3027
3028                 mutex_lock(&mdsc->mutex);
3029         }
3030         mutex_unlock(&mdsc->mutex);
3031
3032         schedule_delayed(mdsc);
3033 }
3034
3035 int ceph_mdsc_init(struct ceph_fs_client *fsc)
3036
3037 {
3038         struct ceph_mds_client *mdsc;
3039
3040         mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
3041         if (!mdsc)
3042                 return -ENOMEM;
3043         mdsc->fsc = fsc;
3044         fsc->mdsc = mdsc;
3045         mutex_init(&mdsc->mutex);
3046         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3047         if (mdsc->mdsmap == NULL)
3048                 return -ENOMEM;
3049
3050         init_completion(&mdsc->safe_umount_waiters);
3051         init_waitqueue_head(&mdsc->session_close_wq);
3052         INIT_LIST_HEAD(&mdsc->waiting_for_map);
3053         mdsc->sessions = NULL;
3054         mdsc->max_sessions = 0;
3055         mdsc->stopping = 0;
3056         init_rwsem(&mdsc->snap_rwsem);
3057         mdsc->snap_realms = RB_ROOT;
3058         INIT_LIST_HEAD(&mdsc->snap_empty);
3059         spin_lock_init(&mdsc->snap_empty_lock);
3060         mdsc->last_tid = 0;
3061         mdsc->request_tree = RB_ROOT;
3062         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3063         mdsc->last_renew_caps = jiffies;
3064         INIT_LIST_HEAD(&mdsc->cap_delay_list);
3065         spin_lock_init(&mdsc->cap_delay_lock);
3066         INIT_LIST_HEAD(&mdsc->snap_flush_list);
3067         spin_lock_init(&mdsc->snap_flush_lock);
3068         mdsc->cap_flush_seq = 0;
3069         INIT_LIST_HEAD(&mdsc->cap_dirty);
3070         INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3071         mdsc->num_cap_flushing = 0;
3072         spin_lock_init(&mdsc->cap_dirty_lock);
3073         init_waitqueue_head(&mdsc->cap_flushing_wq);
3074         spin_lock_init(&mdsc->dentry_lru_lock);
3075         INIT_LIST_HEAD(&mdsc->dentry_lru);
3076
3077         ceph_caps_init(mdsc);
3078         ceph_adjust_min_caps(mdsc, fsc->min_caps);
3079
3080         return 0;
3081 }
3082
3083 /*
3084  * Wait for safe replies on open mds requests.  If we time out, drop
3085  * all requests from the tree to avoid dangling dentry refs.
3086  */
3087 static void wait_requests(struct ceph_mds_client *mdsc)
3088 {
3089         struct ceph_mds_request *req;
3090         struct ceph_fs_client *fsc = mdsc->fsc;
3091
3092         mutex_lock(&mdsc->mutex);
3093         if (__get_oldest_req(mdsc)) {
3094                 mutex_unlock(&mdsc->mutex);
3095
3096                 dout("wait_requests waiting for requests\n");
3097                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3098                                     fsc->client->options->mount_timeout * HZ);
3099
3100                 /* tear down remaining requests */
3101                 mutex_lock(&mdsc->mutex);
3102                 while ((req = __get_oldest_req(mdsc))) {
3103                         dout("wait_requests timed out on tid %llu\n",
3104                              req->r_tid);
3105                         __unregister_request(mdsc, req);
3106                 }
3107         }
3108         mutex_unlock(&mdsc->mutex);
3109         dout("wait_requests done\n");
3110 }
3111
3112 /*
3113  * called before mount is ro, and before dentries are torn down.
3114  * (hmm, does this still race with new lookups?)
3115  */
3116 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3117 {
3118         dout("pre_umount\n");
3119         mdsc->stopping = 1;
3120
3121         drop_leases(mdsc);
3122         ceph_flush_dirty_caps(mdsc);
3123         wait_requests(mdsc);
3124
3125         /*
3126          * wait for reply handlers to drop their request refs and
3127          * their inode/dcache refs
3128          */
3129         ceph_msgr_flush();
3130 }
3131
3132 /*
3133  * wait for all write mds requests to flush.
3134  */
3135 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3136 {
3137         struct ceph_mds_request *req = NULL, *nextreq;
3138         struct rb_node *n;
3139
3140         mutex_lock(&mdsc->mutex);
3141         dout("wait_unsafe_requests want %lld\n", want_tid);
3142 restart:
3143         req = __get_oldest_req(mdsc);
3144         while (req && req->r_tid <= want_tid) {
3145                 /* find next request */
3146                 n = rb_next(&req->r_node);
3147                 if (n)
3148                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3149                 else
3150                         nextreq = NULL;
3151                 if ((req->r_op & CEPH_MDS_OP_WRITE)) {
3152                         /* write op */
3153                         ceph_mdsc_get_request(req);
3154                         if (nextreq)
3155                                 ceph_mdsc_get_request(nextreq);
3156                         mutex_unlock(&mdsc->mutex);
3157                         dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3158                              req->r_tid, want_tid);
3159                         wait_for_completion(&req->r_safe_completion);
3160                         mutex_lock(&mdsc->mutex);
3161                         ceph_mdsc_put_request(req);
3162                         if (!nextreq)
3163                                 break;  /* next dne before, so we're done! */
3164                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
3165                                 /* next request was removed from tree */
3166                                 ceph_mdsc_put_request(nextreq);
3167                                 goto restart;
3168                         }
3169                         ceph_mdsc_put_request(nextreq);  /* won't go away */
3170                 }
3171                 req = nextreq;
3172         }
3173         mutex_unlock(&mdsc->mutex);
3174         dout("wait_unsafe_requests done\n");
3175 }
3176
3177 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3178 {
3179         u64 want_tid, want_flush;
3180
3181         if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3182                 return;
3183
3184         dout("sync\n");
3185         mutex_lock(&mdsc->mutex);
3186         want_tid = mdsc->last_tid;
3187         want_flush = mdsc->cap_flush_seq;
3188         mutex_unlock(&mdsc->mutex);
3189         dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3190
3191         ceph_flush_dirty_caps(mdsc);
3192
3193         wait_unsafe_requests(mdsc, want_tid);
3194         wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3195 }
3196
3197 /*
3198  * true if all sessions are closed, or we force unmount
3199  */
3200 static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3201 {
3202         int i, n = 0;
3203
3204         if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3205                 return true;
3206
3207         mutex_lock(&mdsc->mutex);
3208         for (i = 0; i < mdsc->max_sessions; i++)
3209                 if (mdsc->sessions[i])
3210                         n++;
3211         mutex_unlock(&mdsc->mutex);
3212         return n == 0;
3213 }
3214
3215 /*
3216  * called after sb is ro.
3217  */
3218 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3219 {
3220         struct ceph_mds_session *session;
3221         int i;
3222         struct ceph_fs_client *fsc = mdsc->fsc;
3223         unsigned long timeout = fsc->client->options->mount_timeout * HZ;
3224
3225         dout("close_sessions\n");
3226
3227         /* close sessions */
3228         mutex_lock(&mdsc->mutex);
3229         for (i = 0; i < mdsc->max_sessions; i++) {
3230                 session = __ceph_lookup_mds_session(mdsc, i);
3231                 if (!session)
3232                         continue;
3233                 mutex_unlock(&mdsc->mutex);
3234                 mutex_lock(&session->s_mutex);
3235                 __close_session(mdsc, session);
3236                 mutex_unlock(&session->s_mutex);
3237                 ceph_put_mds_session(session);
3238                 mutex_lock(&mdsc->mutex);
3239         }
3240         mutex_unlock(&mdsc->mutex);
3241
3242         dout("waiting for sessions to close\n");
3243         wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3244                            timeout);
3245
3246         /* tear down remaining sessions */
3247         mutex_lock(&mdsc->mutex);
3248         for (i = 0; i < mdsc->max_sessions; i++) {
3249                 if (mdsc->sessions[i]) {
3250                         session = get_session(mdsc->sessions[i]);
3251                         __unregister_session(mdsc, session);
3252                         mutex_unlock(&mdsc->mutex);
3253                         mutex_lock(&session->s_mutex);
3254                         remove_session_caps(session);
3255                         mutex_unlock(&session->s_mutex);
3256                         ceph_put_mds_session(session);
3257                         mutex_lock(&mdsc->mutex);
3258                 }
3259         }
3260         WARN_ON(!list_empty(&mdsc->cap_delay_list));
3261         mutex_unlock(&mdsc->mutex);
3262
3263         ceph_cleanup_empty_realms(mdsc);
3264
3265         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3266
3267         dout("stopped\n");
3268 }
3269
3270 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3271 {
3272         dout("stop\n");
3273         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3274         if (mdsc->mdsmap)
3275                 ceph_mdsmap_destroy(mdsc->mdsmap);
3276         kfree(mdsc->sessions);
3277         ceph_caps_finalize(mdsc);
3278 }
3279
3280 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3281 {
3282         struct ceph_mds_client *mdsc = fsc->mdsc;
3283
3284         dout("mdsc_destroy %p\n", mdsc);
3285         ceph_mdsc_stop(mdsc);
3286
3287         /* flush out any connection work with references to us */
3288         ceph_msgr_flush();
3289
3290         fsc->mdsc = NULL;
3291         kfree(mdsc);
3292         dout("mdsc_destroy %p done\n", mdsc);
3293 }
3294
3295
3296 /*
3297  * handle mds map update.
3298  */
3299 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3300 {
3301         u32 epoch;
3302         u32 maplen;
3303         void *p = msg->front.iov_base;
3304         void *end = p + msg->front.iov_len;
3305         struct ceph_mdsmap *newmap, *oldmap;
3306         struct ceph_fsid fsid;
3307         int err = -EINVAL;
3308
3309         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3310         ceph_decode_copy(&p, &fsid, sizeof(fsid));
3311         if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3312                 return;
3313         epoch = ceph_decode_32(&p);
3314         maplen = ceph_decode_32(&p);
3315         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3316
3317         /* do we need it? */
3318         ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3319         mutex_lock(&mdsc->mutex);
3320         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3321                 dout("handle_map epoch %u <= our %u\n",
3322                      epoch, mdsc->mdsmap->m_epoch);
3323                 mutex_unlock(&mdsc->mutex);
3324                 return;
3325         }
3326
3327         newmap = ceph_mdsmap_decode(&p, end);
3328         if (IS_ERR(newmap)) {
3329                 err = PTR_ERR(newmap);
3330                 goto bad_unlock;
3331         }
3332
3333         /* swap into place */
3334         if (mdsc->mdsmap) {
3335                 oldmap = mdsc->mdsmap;
3336                 mdsc->mdsmap = newmap;
3337                 check_new_map(mdsc, newmap, oldmap);
3338                 ceph_mdsmap_destroy(oldmap);
3339         } else {
3340                 mdsc->mdsmap = newmap;  /* first mds map */
3341         }
3342         mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3343
3344         __wake_requests(mdsc, &mdsc->waiting_for_map);
3345
3346         mutex_unlock(&mdsc->mutex);
3347         schedule_delayed(mdsc);
3348         return;
3349
3350 bad_unlock:
3351         mutex_unlock(&mdsc->mutex);
3352 bad:
3353         pr_err("error decoding mdsmap %d\n", err);
3354         return;
3355 }
3356
3357 static struct ceph_connection *con_get(struct ceph_connection *con)
3358 {
3359         struct ceph_mds_session *s = con->private;
3360
3361         if (get_session(s)) {
3362                 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3363                 return con;
3364         }
3365         dout("mdsc con_get %p FAIL\n", s);
3366         return NULL;
3367 }
3368
3369 static void con_put(struct ceph_connection *con)
3370 {
3371         struct ceph_mds_session *s = con->private;
3372
3373         dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3374         ceph_put_mds_session(s);
3375 }
3376
3377 /*
3378  * if the client is unresponsive for long enough, the mds will kill
3379  * the session entirely.
3380  */
3381 static void peer_reset(struct ceph_connection *con)
3382 {
3383         struct ceph_mds_session *s = con->private;
3384         struct ceph_mds_client *mdsc = s->s_mdsc;
3385
3386         pr_warning("mds%d closed our session\n", s->s_mds);
3387         send_mds_reconnect(mdsc, s);
3388 }
3389
3390 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3391 {
3392         struct ceph_mds_session *s = con->private;
3393         struct ceph_mds_client *mdsc = s->s_mdsc;
3394         int type = le16_to_cpu(msg->hdr.type);
3395
3396         mutex_lock(&mdsc->mutex);
3397         if (__verify_registered_session(mdsc, s) < 0) {
3398                 mutex_unlock(&mdsc->mutex);
3399                 goto out;
3400         }
3401         mutex_unlock(&mdsc->mutex);
3402
3403         switch (type) {
3404         case CEPH_MSG_MDS_MAP:
3405                 ceph_mdsc_handle_map(mdsc, msg);
3406                 break;
3407         case CEPH_MSG_CLIENT_SESSION:
3408                 handle_session(s, msg);
3409                 break;
3410         case CEPH_MSG_CLIENT_REPLY:
3411                 handle_reply(s, msg);
3412                 break;
3413         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3414                 handle_forward(mdsc, s, msg);
3415                 break;
3416         case CEPH_MSG_CLIENT_CAPS:
3417                 ceph_handle_caps(s, msg);
3418                 break;
3419         case CEPH_MSG_CLIENT_SNAP:
3420                 ceph_handle_snap(mdsc, s, msg);
3421                 break;
3422         case CEPH_MSG_CLIENT_LEASE:
3423                 handle_lease(mdsc, s, msg);
3424                 break;
3425
3426         default:
3427                 pr_err("received unknown message type %d %s\n", type,
3428                        ceph_msg_type_name(type));
3429         }
3430 out:
3431         ceph_msg_put(msg);
3432 }
3433
3434 /*
3435  * authentication
3436  */
3437
3438 /*
3439  * Note: returned pointer is the address of a structure that's
3440  * managed separately.  Caller must *not* attempt to free it.
3441  */
3442 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
3443                                         int *proto, int force_new)
3444 {
3445         struct ceph_mds_session *s = con->private;
3446         struct ceph_mds_client *mdsc = s->s_mdsc;
3447         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3448         struct ceph_auth_handshake *auth = &s->s_auth;
3449
3450         if (force_new && auth->authorizer) {
3451                 ceph_auth_destroy_authorizer(ac, auth->authorizer);
3452                 auth->authorizer = NULL;
3453         }
3454         if (!auth->authorizer) {
3455                 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3456                                                       auth);
3457                 if (ret)
3458                         return ERR_PTR(ret);
3459         } else {
3460                 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3461                                                       auth);
3462                 if (ret)
3463                         return ERR_PTR(ret);
3464         }
3465         *proto = ac->protocol;
3466
3467         return auth;
3468 }
3469
3470
3471 static int verify_authorizer_reply(struct ceph_connection *con, int len)
3472 {
3473         struct ceph_mds_session *s = con->private;
3474         struct ceph_mds_client *mdsc = s->s_mdsc;
3475         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3476
3477         return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len);
3478 }
3479
3480 static int invalidate_authorizer(struct ceph_connection *con)
3481 {
3482         struct ceph_mds_session *s = con->private;
3483         struct ceph_mds_client *mdsc = s->s_mdsc;
3484         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3485
3486         ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3487
3488         return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3489 }
3490
3491 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
3492                                 struct ceph_msg_header *hdr, int *skip)
3493 {
3494         struct ceph_msg *msg;
3495         int type = (int) le16_to_cpu(hdr->type);
3496         int front_len = (int) le32_to_cpu(hdr->front_len);
3497
3498         if (con->in_msg)
3499                 return con->in_msg;
3500
3501         *skip = 0;
3502         msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
3503         if (!msg) {
3504                 pr_err("unable to allocate msg type %d len %d\n",
3505                        type, front_len);
3506                 return NULL;
3507         }
3508
3509         return msg;
3510 }
3511
3512 static const struct ceph_connection_operations mds_con_ops = {
3513         .get = con_get,
3514         .put = con_put,
3515         .dispatch = dispatch,
3516         .get_authorizer = get_authorizer,
3517         .verify_authorizer_reply = verify_authorizer_reply,
3518         .invalidate_authorizer = invalidate_authorizer,
3519         .peer_reset = peer_reset,
3520         .alloc_msg = mds_alloc_msg,
3521 };
3522
3523 /* eof */