diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index b1b8ef864d58..449847badcd8 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -183,9 +183,31 @@ struct rbd_obj_request { u64 length; /* bytes from offset */ unsigned long flags; - struct rbd_img_request *img_request; - u64 img_offset; /* image relative offset */ - struct list_head links; /* img_request->obj_requests */ + /* + * An object request associated with an image will have its + * img_data flag set; a standalone object request will not. + * + * A standalone object request will have which == BAD_WHICH + * and a null obj_request pointer. + * + * An object request initiated in support of a layered image + * object (to check for its existence before a write) will + * have which == BAD_WHICH and a non-null obj_request pointer. + * + * Finally, an object request for rbd image data will have + * which != BAD_WHICH, and will have a non-null img_request + * pointer. The value of which will be in the range + * 0..(img_request->obj_request_count-1). + */ + union { + struct rbd_obj_request *obj_request; /* STAT op */ + struct { + struct rbd_img_request *img_request; + u64 img_offset; + /* links for img_request->obj_requests list */ + struct list_head links; + }; + }; u32 which; /* posn image request list */ enum obj_request_type type; @@ -1656,10 +1678,6 @@ static struct rbd_img_request *rbd_img_request_create( INIT_LIST_HEAD(&img_request->obj_requests); kref_init(&img_request->kref); - (void) obj_request_existence_set; - (void) obj_request_known_test; - (void) obj_request_exists_test; - rbd_img_request_get(img_request); /* Avoid a warning */ rbd_img_request_put(img_request); /* TEMPORARY */ @@ -1847,18 +1865,147 @@ out_unwind: return -ENOMEM; } +static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) +{ + struct rbd_device *rbd_dev; + struct ceph_osd_client *osdc; + struct rbd_obj_request *orig_request; + int result; + + rbd_assert(!obj_request_img_data_test(obj_request)); + + /* + * All we need from the object request is the original + * request and the result of the STAT op. Grab those, then + * we're done with the request. + */ + orig_request = obj_request->obj_request; + obj_request->obj_request = NULL; + rbd_assert(orig_request); + rbd_assert(orig_request->img_request); + + result = obj_request->result; + obj_request->result = 0; + + dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__, + obj_request, orig_request, result, + obj_request->xferred, obj_request->length); + rbd_obj_request_put(obj_request); + + rbd_assert(orig_request); + rbd_assert(orig_request->img_request); + rbd_dev = orig_request->img_request->rbd_dev; + osdc = &rbd_dev->rbd_client->client->osdc; + + /* + * Our only purpose here is to determine whether the object + * exists, and we don't want to treat the non-existence as + * an error. If something else comes back, transfer the + * error to the original request and complete it now. + */ + if (!result) { + obj_request_existence_set(orig_request, true); + } else if (result == -ENOENT) { + obj_request_existence_set(orig_request, false); + } else if (result) { + orig_request->result = result; + goto out_err; + } + + /* + * Resubmit the original request now that we have recorded + * whether the target object exists. + */ + orig_request->result = rbd_obj_request_submit(osdc, orig_request); +out_err: + if (orig_request->result) + rbd_obj_request_complete(orig_request); + rbd_obj_request_put(orig_request); +} + +static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) +{ + struct rbd_obj_request *stat_request; + struct rbd_device *rbd_dev; + struct ceph_osd_client *osdc; + struct page **pages = NULL; + u32 page_count; + size_t size; + int ret; + + /* + * The response data for a STAT call consists of: + * le64 length; + * struct { + * le32 tv_sec; + * le32 tv_nsec; + * } mtime; + */ + size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); + page_count = (u32)calc_pages_for(0, size); + pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + ret = -ENOMEM; + stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, + OBJ_REQUEST_PAGES); + if (!stat_request) + goto out; + + rbd_obj_request_get(obj_request); + stat_request->obj_request = obj_request; + stat_request->pages = pages; + stat_request->page_count = page_count; + + rbd_assert(obj_request->img_request); + rbd_dev = obj_request->img_request->rbd_dev; + stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, + stat_request); + if (!stat_request->osd_req) + goto out; + stat_request->callback = rbd_img_obj_exists_callback; + + osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT); + osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, + false, false); + rbd_osd_req_format(stat_request, false); + + osdc = &rbd_dev->rbd_client->client->osdc; + ret = rbd_obj_request_submit(osdc, stat_request); +out: + if (ret) + rbd_obj_request_put(obj_request); + + return ret; +} + static int rbd_img_request_submit(struct rbd_img_request *img_request) { struct rbd_device *rbd_dev = img_request->rbd_dev; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; struct rbd_obj_request *next_obj_request; + bool write_request = img_request_write_test(img_request); + bool layered = img_request_layered_test(img_request); dout("%s: img %p\n", __func__, img_request); for_each_obj_request_safe(img_request, obj_request, next_obj_request) { + bool known; + bool object_exists; int ret; - ret = rbd_obj_request_submit(osdc, obj_request); + /* + * We need to know whether the target object exists + * for a layered write. Issue an existence check + * first if we need to. + */ + known = obj_request_known_test(obj_request); + object_exists = known && obj_request_exists_test(obj_request); + if (!write_request || !layered || object_exists) + ret = rbd_obj_request_submit(osdc, obj_request); + else + ret = rbd_img_obj_exists_submit(obj_request); if (ret) return ret; }