libceph: rename MAX_OBJ_NAME_SIZE to CEPH_MAX_OID_NAME_LEN
[firefly-linux-kernel-4.4.55.git] / net / ceph / osd_client.c
index 2b4b32aaa893b3117043e6a218fcde6c58f0aff4..a053e7e4a780b01a79cd9d272ad42c680fa8cd66 100644 (file)
@@ -338,7 +338,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        msg_size = 4 + 4 + 8 + 8 + 4+8;
        msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
        msg_size += 1 + 8 + 4 + 4;     /* pg_t */
-       msg_size += 4 + MAX_OBJ_NAME_SIZE;
+       msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
        msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
        msg_size += 8;  /* snapid */
        msg_size += 8;  /* snap_seq */
@@ -368,6 +368,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        INIT_LIST_HEAD(&req->r_req_lru_item);
        INIT_LIST_HEAD(&req->r_osd_item);
 
+       req->r_oloc.pool = -1;
+
        /* create reply message */
        if (use_mempool)
                msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -761,7 +763,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        if (num_ops > 1)
                osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
 
-       req->r_file_layout = *layout;  /* keep a copy */
+       req->r_oloc.pool = ceph_file_layout_pg_pool(*layout);
 
        snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx",
                vino.ino, objnum);
@@ -1044,8 +1046,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
                        !ceph_con_opened(&osd->o_con)) {
                struct ceph_osd_request *req;
 
-               dout(" osd addr hasn't changed and connection never opened,"
-                    " letting msgr retry");
+               dout("osd addr hasn't changed and connection never opened, "
+                    "letting msgr retry\n");
                /* touch each r_stamp for handle_timeout()'s benfit */
                list_for_each_entry(req, &osd->o_requests, r_osd_item)
                        req->r_stamp = jiffies;
@@ -1231,6 +1233,22 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
 }
 EXPORT_SYMBOL(ceph_osdc_set_request_linger);
 
+/*
+ * Returns whether a request should be blocked from being sent
+ * based on the current osdmap and osd_client settings.
+ *
+ * Caller should hold map_sem for read.
+ */
+static bool __req_should_be_paused(struct ceph_osd_client *osdc,
+                                  struct ceph_osd_request *req)
+{
+       bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
+       bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
+               ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+       return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) ||
+               (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr);
+}
+
 /*
  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
  * (as needed), and set the request r_osd appropriately.  If there is
@@ -1248,10 +1266,11 @@ static int __map_request(struct ceph_osd_client *osdc,
        int acting[CEPH_PG_MAX_SIZE];
        int o = -1, num = 0;
        int err;
+       bool was_paused;
 
        dout("map_request %p tid %lld\n", req, req->r_tid);
        err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap,
-                               ceph_file_layout_pg_pool(req->r_file_layout));
+                               req->r_oloc.pool);
        if (err) {
                list_move(&req->r_req_lru_item, &osdc->req_notarget);
                return err;
@@ -1264,12 +1283,18 @@ static int __map_request(struct ceph_osd_client *osdc,
                num = err;
        }
 
+       was_paused = req->r_paused;
+       req->r_paused = __req_should_be_paused(osdc, req);
+       if (was_paused && !req->r_paused)
+               force_resend = 1;
+
        if ((!force_resend &&
             req->r_osd && req->r_osd->o_osd == o &&
             req->r_sent >= req->r_osd->o_incarnation &&
             req->r_num_pg_osds == num &&
             memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
-           (req->r_osd == NULL && o == -1))
+           (req->r_osd == NULL && o == -1) ||
+           req->r_paused)
                return 0;  /* no change */
 
        dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
@@ -1331,7 +1356,7 @@ static void __send_request(struct ceph_osd_client *osdc,
        /* fill in message content that changes each time we send it */
        put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
        put_unaligned_le32(req->r_flags, req->r_request_flags);
-       put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
+       put_unaligned_le64(req->r_oloc.pool, req->r_request_pool);
        p = req->r_request_pgid;
        ceph_encode_64(&p, req->r_pgid.pool);
        ceph_encode_32(&p, req->r_pgid.seed);
@@ -1581,6 +1606,13 @@ done:
        return;
 
 bad_put:
+       req->r_result = -EIO;
+       __unregister_request(osdc, req);
+       if (req->r_callback)
+               req->r_callback(req, msg);
+       else
+               complete_all(&req->r_completion);
+       complete_request(req);
        ceph_osdc_put_request(req);
 bad_mutex:
        mutex_unlock(&osdc->request_mutex);
@@ -1613,14 +1645,17 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
  *
  * Caller should hold map_sem for read.
  */
-static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
+static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
+                         bool force_resend_writes)
 {
        struct ceph_osd_request *req, *nreq;
        struct rb_node *p;
        int needmap = 0;
        int err;
+       bool force_resend_req;
 
-       dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
+       dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "",
+               force_resend_writes ? " (force resend writes)" : "");
        mutex_lock(&osdc->request_mutex);
        for (p = rb_first(&osdc->requests); p; ) {
                req = rb_entry(p, struct ceph_osd_request, r_node);
@@ -1645,7 +1680,10 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
                        continue;
                }
 
-               err = __map_request(osdc, req, force_resend);
+               force_resend_req = force_resend ||
+                       (force_resend_writes &&
+                               req->r_flags & CEPH_OSD_FLAG_WRITE);
+               err = __map_request(osdc, req, force_resend_req);
                if (err < 0)
                        continue;  /* error */
                if (req->r_osd == NULL) {
@@ -1665,7 +1703,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
                                 r_linger_item) {
                dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
 
-               err = __map_request(osdc, req, force_resend);
+               err = __map_request(osdc, req,
+                                   force_resend || force_resend_writes);
                dout("__map_request returned %d\n", err);
                if (err == 0)
                        continue;  /* no change and no osd was specified */
@@ -1707,6 +1746,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        struct ceph_osdmap *newmap = NULL, *oldmap;
        int err;
        struct ceph_fsid fsid;
+       bool was_full;
 
        dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
        p = msg->front.iov_base;
@@ -1720,6 +1760,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 
        down_write(&osdc->map_sem);
 
+       was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+
        /* incremental maps */
        ceph_decode_32_safe(&p, end, nr_maps, bad);
        dout(" %d inc maps\n", nr_maps);
@@ -1744,7 +1786,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                                ceph_osdmap_destroy(osdc->osdmap);
                                osdc->osdmap = newmap;
                        }
-                       kick_requests(osdc, 0);
+                       was_full = was_full ||
+                               ceph_osdmap_flag(osdc->osdmap,
+                                                CEPH_OSDMAP_FULL);
+                       kick_requests(osdc, 0, was_full);
                } else {
                        dout("ignoring incremental map %u len %d\n",
                             epoch, maplen);
@@ -1787,7 +1832,10 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                                        skipped_map = 1;
                                ceph_osdmap_destroy(oldmap);
                        }
-                       kick_requests(osdc, skipped_map);
+                       was_full = was_full ||
+                               ceph_osdmap_flag(osdc->osdmap,
+                                                CEPH_OSDMAP_FULL);
+                       kick_requests(osdc, skipped_map, was_full);
                }
                p += maplen;
                nr_maps--;
@@ -1804,7 +1852,9 @@ done:
         * we find out when we are no longer full and stop returning
         * ENOSPC.
         */
-       if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
+       if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+               ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
+               ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR))
                ceph_monc_request_next_osdmap(&osdc->client->monc);
 
        mutex_lock(&osdc->request_mutex);
@@ -2454,7 +2504,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        struct ceph_osd_client *osdc = osd->o_osdc;
        struct ceph_msg *m;
        struct ceph_osd_request *req;
-       int front = le32_to_cpu(hdr->front_len);
+       int front_len = le32_to_cpu(hdr->front_len);
        int data_len = le32_to_cpu(hdr->data_len);
        u64 tid;
 
@@ -2474,12 +2524,13 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                     req->r_reply, req->r_reply->con);
        ceph_msg_revoke_incoming(req->r_reply);
 
-       if (front > req->r_reply->front.iov_len) {
+       if (front_len > req->r_reply->front_alloc_len) {
                pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n",
-                          front, (int)req->r_reply->front.iov_len,
+                          front_len, req->r_reply->front_alloc_len,
                           (unsigned int)con->peer_name.type,
                           le64_to_cpu(con->peer_name.num));
-               m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
+               m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
+                                false);
                if (!m)
                        goto out;
                ceph_msg_put(req->r_reply);