From 3167893ae60e847b5a44d582fedb544cd0ae7ef5 Mon Sep 17 00:00:00 2001 From: Chengguang Xu Date: Mon, 30 Jul 2018 23:55:36 +0800 Subject: [PATCH 01/25] ceph: reset cap hold timeout only for requeued inode __cap_delay_requeue() only requeue inode which does not have CEPH_I_FLUSH flag, so avoid reset cap hold timeout for that inode. Signed-off-by: Chengguang Xu Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/caps.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index dd7dfdd2ba13..b26ae1673992 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -521,7 +521,6 @@ static void __cap_set_timeouts(struct ceph_mds_client *mdsc, static void __cap_delay_requeue(struct ceph_mds_client *mdsc, struct ceph_inode_info *ci) { - __cap_set_timeouts(mdsc, ci); dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode, ci->i_ceph_flags, ci->i_hold_caps_max); if (!mdsc->stopping) { @@ -531,6 +530,7 @@ static void __cap_delay_requeue(struct ceph_mds_client *mdsc, goto no_change; list_del_init(&ci->i_cap_delay_list); } + __cap_set_timeouts(mdsc, ci); list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list); no_change: spin_unlock(&mdsc->cap_delay_lock); From 7d8dc53414c5770afb8b83db096ecab0cd53ec93 Mon Sep 17 00:00:00 2001 From: Chengguang Xu Date: Sun, 12 Aug 2018 23:06:54 +0800 Subject: [PATCH 02/25] rbd: add __init/__exit annotations Add __init/__exit annotation to init/cleanup helpers which are only called once in the module. Signed-off-by: Chengguang Xu Reviewed-by: Ilya Dryomov Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 73ed5f3a862d..f2fe692dda40 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -6067,7 +6067,7 @@ static ssize_t rbd_remove_single_major(struct bus_type *bus, * create control files in sysfs * /sys/bus/rbd/... */ -static int rbd_sysfs_init(void) +static int __init rbd_sysfs_init(void) { int ret; @@ -6082,13 +6082,13 @@ static int rbd_sysfs_init(void) return ret; } -static void rbd_sysfs_cleanup(void) +static void __exit rbd_sysfs_cleanup(void) { bus_unregister(&rbd_bus_type); device_unregister(&rbd_root_dev); } -static int rbd_slab_init(void) +static int __init rbd_slab_init(void) { rbd_assert(!rbd_img_request_cache); rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0); From 5da207993e396f445d813338dd3540ddd6d26816 Mon Sep 17 00:00:00 2001 From: Chengguang Xu Date: Sun, 2 Sep 2018 23:21:09 +0800 Subject: [PATCH 03/25] ceph: check snap first in ceph_set_acl() Do the snap check first in ceph_set_acl(), so we can avoid unnecessary operations when the inode has snap. Signed-off-by: Chengguang Xu Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/acl.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c index 027408d55aee..8a9b562ae4ca 100644 --- a/fs/ceph/acl.c +++ b/fs/ceph/acl.c @@ -104,6 +104,11 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type) struct timespec64 old_ctime = inode->i_ctime; umode_t new_mode = inode->i_mode, old_mode = inode->i_mode; + if (ceph_snap(inode) != CEPH_NOSNAP) { + ret = -EROFS; + goto out; + } + switch (type) { case ACL_TYPE_ACCESS: name = XATTR_NAME_POSIX_ACL_ACCESS; @@ -138,11 +143,6 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type) goto out_free; } - if (ceph_snap(inode) != CEPH_NOSNAP) { - ret = -EROFS; - goto out_free; - } - if (new_mode != old_mode) { newattrs.ia_ctime = current_time(inode); newattrs.ia_mode = new_mode; From efe328230dc01aa0b1269aad0b5fae73eea4677a Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 27 Sep 2018 21:16:05 +0800 Subject: [PATCH 04/25] Revert "ceph: fix dentry leak in splice_dentry()" This reverts commit 8b8f53af1ed9df88a4c0fbfdf3db58f62060edf3. splice_dentry() is used by three places. For two places, req->r_dentry is passed to splice_dentry(). In the case of error, req->r_dentry does not get updated. So splice_dentry() should not drop reference. Cc: stable@vger.kernel.org # 4.18+ Signed-off-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/inode.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index ebc7bdaed2d0..4055ab4d5c52 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1132,8 +1132,12 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in) if (IS_ERR(realdn)) { pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n", PTR_ERR(realdn), dn, in, ceph_vinop(in)); - dput(dn); - dn = realdn; /* note realdn contains the error */ + dn = realdn; + /* + * Caller should release 'dn' in the case of error. + * If 'req->r_dentry' is passed to this function, + * caller should leave 'req->r_dentry' untouched. + */ goto out; } else if (realdn) { dout("dn %p (%d) spliced with %p (%d) " From c58f450bd61511d897efc2ea472c69630635b557 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 28 Sep 2018 09:10:29 +0800 Subject: [PATCH 05/25] ceph: fix dentry leak in ceph_readdir_prepopulate Signed-off-by: "Yan, Zheng" Reviewed-by: Jeff Layton Signed-off-by: Ilya Dryomov --- fs/ceph/inode.c | 1 - 1 file changed, 1 deletion(-) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 4055ab4d5c52..46254d53392f 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1681,7 +1681,6 @@ retry_lookup: if (IS_ERR(realdn)) { err = PTR_ERR(realdn); d_drop(dn); - dn = NULL; goto next_item; } dn = realdn; From 74c9e6bf4c888e41bc7db340fb94fade96394038 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 28 Sep 2018 11:34:42 +0800 Subject: [PATCH 06/25] ceph: check if LOOKUPNAME request was aborted when filling trace d_lookup()/d_alloc() require parent inode locked. Parent inode is not locked if request is aborted. Signed-off-by: "Yan, Zheng" Reviewed-by: Jeff Layton Signed-off-by: Ilya Dryomov --- fs/ceph/inode.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 46254d53392f..79dd5e6ed755 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1200,7 +1200,9 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req) WARN_ON_ONCE(1); } - if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME) { + if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME && + test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) && + !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { struct qstr dname; struct dentry *dn, *parent; From fce7a9744bdf3baa9d08c52d2c78f53ab49918a4 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sat, 29 Sep 2018 16:02:19 +0800 Subject: [PATCH 07/25] ceph: refactor ceph_sync_read() Avoid allocating memory for the entire user request: striped_read() does a synchronous OSD request per object, so it doesn't need more than object size worth of pages at a time. [ Preserve the comment, changelog. ] Signed-off-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/file.c | 219 ++++++++++++++++++++++++------------------------- 1 file changed, 106 insertions(+), 113 deletions(-) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 92ab20433682..846997481674 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -557,90 +557,26 @@ enum { }; /* - * Read a range of bytes striped over one or more objects. Iterate over - * objects we stripe over. (That's not atomic, but good enough for now.) + * Completely synchronous read and write methods. Direct from __user + * buffer to osd, or directly to user pages (if O_DIRECT). + * + * If the read spans object boundary, just do multiple reads. (That's not + * atomic, but good enough for now.) * * If we get a short result from the OSD, check against i_size; we need to * only return a short read to the caller if we hit EOF. */ -static int striped_read(struct inode *inode, - u64 pos, u64 len, - struct page **pages, int num_pages, - int page_align, int *checkeof) -{ - struct ceph_fs_client *fsc = ceph_inode_to_client(inode); - struct ceph_inode_info *ci = ceph_inode(inode); - u64 this_len; - loff_t i_size; - int page_idx; - int ret, read = 0; - bool hit_stripe, was_short; - - /* - * we may need to do multiple reads. not atomic, unfortunately. - */ -more: - this_len = len; - page_idx = (page_align + read) >> PAGE_SHIFT; - ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), - &ci->i_layout, pos, &this_len, - ci->i_truncate_seq, ci->i_truncate_size, - pages + page_idx, num_pages - page_idx, - ((page_align + read) & ~PAGE_MASK)); - if (ret == -ENOENT) - ret = 0; - hit_stripe = this_len < len; - was_short = ret >= 0 && ret < this_len; - dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read, - ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); - - i_size = i_size_read(inode); - if (ret >= 0) { - if (was_short && (pos + ret < i_size)) { - int zlen = min(this_len - ret, i_size - pos - ret); - int zoff = page_align + read + ret; - dout(" zero gap %llu to %llu\n", - pos + ret, pos + ret + zlen); - ceph_zero_page_vector_range(zoff, zlen, pages); - ret += zlen; - } - - read += ret; - pos += ret; - len -= ret; - - /* hit stripe and need continue*/ - if (len && hit_stripe && pos < i_size) - goto more; - } - - if (read > 0) { - ret = read; - /* did we bounce off eof? */ - if (pos + len > i_size) - *checkeof = CHECK_EOF; - } - - dout("striped_read returns %d\n", ret); - return ret; -} - -/* - * Completely synchronous read and write methods. Direct from __user - * buffer to osd, or directly to user pages (if O_DIRECT). - * - * If the read spans object boundary, just do multiple reads. - */ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, - int *checkeof) + int *retry_op) { struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); - struct page **pages; - u64 off = iocb->ki_pos; - int num_pages; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_osd_client *osdc = &fsc->client->osdc; ssize_t ret; - size_t len = iov_iter_count(to); + u64 off = iocb->ki_pos; + u64 len = iov_iter_count(to); dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); @@ -653,61 +589,118 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, * but it will at least behave sensibly when they are * in sequence. */ - ret = filemap_write_and_wait_range(inode->i_mapping, off, - off + len); + ret = filemap_write_and_wait_range(inode->i_mapping, off, off + len); if (ret < 0) return ret; - if (unlikely(to->type & ITER_PIPE)) { + ret = 0; + while ((len = iov_iter_count(to)) > 0) { + struct ceph_osd_request *req; + struct page **pages; + int num_pages; size_t page_off; - ret = iov_iter_get_pages_alloc(to, &pages, len, - &page_off); - if (ret <= 0) - return -ENOMEM; - num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE); + u64 i_size; + bool more; - ret = striped_read(inode, off, ret, pages, num_pages, - page_off, checkeof); - if (ret > 0) { - iov_iter_advance(to, ret); - off += ret; - } else { - iov_iter_advance(to, 0); + req = ceph_osdc_new_request(osdc, &ci->i_layout, + ci->i_vino, off, &len, 0, 1, + CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, + NULL, ci->i_truncate_seq, + ci->i_truncate_size, false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + break; } - ceph_put_page_vector(pages, num_pages, false); - } else { - num_pages = calc_pages_for(off, len); - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); - ret = striped_read(inode, off, len, pages, num_pages, - (off & ~PAGE_MASK), checkeof); - if (ret > 0) { - int l, k = 0; - size_t left = ret; + more = len < iov_iter_count(to); - while (left) { - size_t page_off = off & ~PAGE_MASK; - size_t copy = min_t(size_t, left, - PAGE_SIZE - page_off); - l = copy_page_to_iter(pages[k++], page_off, - copy, to); - off += l; - left -= l; - if (l < copy) - break; + if (unlikely(to->type & ITER_PIPE)) { + ret = iov_iter_get_pages_alloc(to, &pages, len, + &page_off); + if (ret <= 0) { + ceph_osdc_put_request(req); + ret = -ENOMEM; + break; + } + num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE); + if (ret < len) { + len = ret; + osd_req_op_extent_update(req, 0, len); + more = false; + } + } else { + num_pages = calc_pages_for(off, len); + page_off = off & ~PAGE_MASK; + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) { + ceph_osdc_put_request(req); + ret = PTR_ERR(pages); + break; } } - ceph_release_page_vector(pages, num_pages); + + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off, + false, false); + ret = ceph_osdc_start_request(osdc, req, false); + if (!ret) + ret = ceph_osdc_wait_request(osdc, req); + ceph_osdc_put_request(req); + + i_size = i_size_read(inode); + dout("sync_read %llu~%llu got %zd i_size %llu%s\n", + off, len, ret, i_size, (more ? " MORE" : "")); + + if (ret == -ENOENT) + ret = 0; + if (ret >= 0 && ret < len && (off + ret < i_size)) { + int zlen = min(len - ret, i_size - off - ret); + int zoff = page_off + ret; + dout("sync_read zero gap %llu~%llu\n", + off + ret, off + ret + zlen); + ceph_zero_page_vector_range(zoff, zlen, pages); + ret += zlen; + } + + if (unlikely(to->type & ITER_PIPE)) { + if (ret > 0) { + iov_iter_advance(to, ret); + off += ret; + } else { + iov_iter_advance(to, 0); + } + ceph_put_page_vector(pages, num_pages, false); + } else { + int idx = 0; + size_t left = ret > 0 ? ret : 0; + while (left > 0) { + size_t len, copied; + page_off = off & ~PAGE_MASK; + len = min_t(size_t, left, PAGE_SIZE - page_off); + copied = copy_page_to_iter(pages[idx++], + page_off, len, to); + off += copied; + left -= copied; + if (copied < len) { + ret = -EFAULT; + break; + } + } + ceph_release_page_vector(pages, num_pages); + } + + if (ret <= 0 || off >= i_size || !more) + break; } if (off > iocb->ki_pos) { + if (ret >= 0 && + iov_iter_count(to) > 0 && off >= i_size_read(inode)) + *retry_op = CHECK_EOF; ret = off - iocb->ki_pos; iocb->ki_pos = off; } - dout("sync_read result %zd\n", ret); + dout("sync_read result %zd retry_op %d\n", ret, *retry_op); return ret; } From bddff633ab7bc60a18a86ac8b322695b6f8594d0 Mon Sep 17 00:00:00 2001 From: Luis Henriques Date: Tue, 9 Oct 2018 18:54:28 +0100 Subject: [PATCH 08/25] ceph: only allow punch hole mode in fallocate Current implementation of cephfs fallocate isn't correct as it doesn't really reserve the space in the cluster, which means that a subsequent call to a write may actually fail due to lack of space. In fact, it is currently possible to fallocate an amount space that is larger than the free space in the cluster. It has behaved this way since the initial commit ad7a60de882a ("ceph: punch hole support"). Since there's no easy solution to fix this at the moment, this patch simply removes support for all fallocate operations but FALLOC_FL_PUNCH_HOLE (which implies FALLOC_FL_KEEP_SIZE). Link: https://tracker.ceph.com/issues/36317 Signed-off-by: Luis Henriques Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/file.c | 45 +++++++++------------------------------------ 1 file changed, 9 insertions(+), 36 deletions(-) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 846997481674..213e7d98248a 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -1728,7 +1728,6 @@ static long ceph_fallocate(struct file *file, int mode, struct ceph_file_info *fi = file->private_data; struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_cap_flush *prealloc_cf; int want, got = 0; int dirty; @@ -1736,10 +1735,7 @@ static long ceph_fallocate(struct file *file, int mode, loff_t endoff = 0; loff_t size; - if ((offset + length) > max(i_size_read(inode), fsc->max_file_size)) - return -EFBIG; - - if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE)) + if (mode != (FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE)) return -EOPNOTSUPP; if (!S_ISREG(inode->i_mode)) @@ -1756,18 +1752,6 @@ static long ceph_fallocate(struct file *file, int mode, goto unlock; } - if (!(mode & (FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE)) && - ceph_quota_is_max_bytes_exceeded(inode, offset + length)) { - ret = -EDQUOT; - goto unlock; - } - - if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL) && - !(mode & FALLOC_FL_PUNCH_HOLE)) { - ret = -ENOSPC; - goto unlock; - } - if (ci->i_inline_version != CEPH_INLINE_NONE) { ret = ceph_uninline_data(file, NULL); if (ret < 0) @@ -1775,12 +1759,12 @@ static long ceph_fallocate(struct file *file, int mode, } size = i_size_read(inode); - if (!(mode & FALLOC_FL_KEEP_SIZE)) { - endoff = offset + length; - ret = inode_newsize_ok(inode, endoff); - if (ret) - goto unlock; - } + + /* Are we punching a hole beyond EOF? */ + if (offset >= size) + goto unlock; + if ((offset + length) > size) + length = size - offset; if (fi->fmode & CEPH_FILE_MODE_LAZY) want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; @@ -1791,16 +1775,8 @@ static long ceph_fallocate(struct file *file, int mode, if (ret < 0) goto unlock; - if (mode & FALLOC_FL_PUNCH_HOLE) { - if (offset < size) - ceph_zero_pagecache_range(inode, offset, length); - ret = ceph_zero_objects(inode, offset, length); - } else if (endoff > size) { - truncate_pagecache_range(inode, size, -1); - if (ceph_inode_set_size(inode, endoff)) - ceph_check_caps(ceph_inode(inode), - CHECK_CAPS_AUTHONLY, NULL); - } + ceph_zero_pagecache_range(inode, offset, length); + ret = ceph_zero_objects(inode, offset, length); if (!ret) { spin_lock(&ci->i_ceph_lock); @@ -1810,9 +1786,6 @@ static long ceph_fallocate(struct file *file, int mode, spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); - if ((endoff > size) && - ceph_quota_is_max_bytes_approaching(inode, endoff)) - ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL); } ceph_put_cap_refs(ci, got); From 94e6992bb560be8bffb47f287194adf070b57695 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 26 Sep 2018 18:03:16 +0200 Subject: [PATCH 09/25] libceph: bump CEPH_MSG_MAX_DATA_LEN If the read is large enough, we end up spinning in the messenger: libceph: osd0 192.168.122.1:6801 io error libceph: osd0 192.168.122.1:6801 io error libceph: osd0 192.168.122.1:6801 io error This is a receive side limit, so only reads were affected. Cc: stable@vger.kernel.org Signed-off-by: Ilya Dryomov --- include/linux/ceph/libceph.h | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index 49c93b9308d7..68bb09c29ce8 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -81,7 +81,13 @@ struct ceph_options { #define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024) #define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024) -#define CEPH_MSG_MAX_DATA_LEN (16*1024*1024) + +/* + * Handle the largest possible rbd object in one message. + * There is no limit on the size of cephfs objects, but it has to obey + * rsize and wsize mount options anyway. + */ +#define CEPH_MSG_MAX_DATA_LEN (32*1024*1024) #define CEPH_AUTH_NAME_DEFAULT "guest" From 24639ce56040a8ea890ad8836068c1ad8bd177c7 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 26 Sep 2018 19:12:07 +0200 Subject: [PATCH 10/25] libceph: osd_req_op_cls_init() doesn't need to take opcode Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 3 +-- include/linux/ceph/osd_client.h | 5 ++--- net/ceph/osd_client.c | 9 ++++----- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index f2fe692dda40..9cc7ee3b427f 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -2374,8 +2374,7 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) if (!obj_req->osd_req) return -ENOMEM; - ret = osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd", - "copyup"); + ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup"); if (ret) return ret; diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 02096da01845..f3e828460c91 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -444,9 +444,8 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *, struct page **pages, u64 length, u32 alignment, bool pages_from_pool, bool own_pages); -extern int osd_req_op_cls_init(struct ceph_osd_request *osd_req, - unsigned int which, u16 opcode, - const char *class, const char *method); +int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, + const char *class, const char *method); extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *name, const void *value, size_t size, u8 cmp_op, u8 cmp_mode); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 60934bd8796c..a871b234cd90 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -767,15 +767,14 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, EXPORT_SYMBOL(osd_req_op_extent_dup_last); int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, - u16 opcode, const char *class, const char *method) + const char *class, const char *method) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, - opcode, 0); + struct ceph_osd_req_op *op; struct ceph_pagelist *pagelist; size_t payload_len = 0; size_t size; - BUG_ON(opcode != CEPH_OSD_OP_CALL); + op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0); pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); if (!pagelist) @@ -4962,7 +4961,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, if (ret) goto out_put_req; - ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); + ret = osd_req_op_cls_init(req, 0, class, method); if (ret) goto out_put_req; From 33165d472310262d8c79c7e4d1a17dc60cea7e35 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 28 Sep 2018 15:38:34 +0200 Subject: [PATCH 11/25] libceph: introduce ceph_pagelist_alloc() struct ceph_pagelist cannot be embedded into anything else because it has its own refcount. Merge allocation and initialization together. Signed-off-by: Ilya Dryomov --- fs/ceph/acl.c | 3 +-- fs/ceph/mds_client.c | 3 +-- fs/ceph/xattr.c | 3 +-- include/linux/ceph/pagelist.h | 11 +---------- net/ceph/osd_client.c | 14 ++++---------- net/ceph/pagelist.c | 20 ++++++++++++++++++++ 6 files changed, 28 insertions(+), 26 deletions(-) diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c index 8a9b562ae4ca..5f0103f40079 100644 --- a/fs/ceph/acl.c +++ b/fs/ceph/acl.c @@ -206,10 +206,9 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode, tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL); if (!tmp_buf) goto out_err; - pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_KERNEL); + pagelist = ceph_pagelist_alloc(GFP_KERNEL); if (!pagelist) goto out_err; - ceph_pagelist_init(pagelist); err = ceph_pagelist_reserve(pagelist, PAGE_SIZE); if (err) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index bc43c822426a..580a79b9a91f 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3126,10 +3126,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, pr_info("mds%d reconnect start\n", mds); - pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); + pagelist = ceph_pagelist_alloc(GFP_NOFS); if (!pagelist) goto fail_nopagelist; - ceph_pagelist_init(pagelist); reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false); if (!reply) diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 5cc8b94f8206..316f6ad10644 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -951,11 +951,10 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name, if (size > 0) { /* copy value into pagelist */ - pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); + pagelist = ceph_pagelist_alloc(GFP_NOFS); if (!pagelist) return -ENOMEM; - ceph_pagelist_init(pagelist); err = ceph_pagelist_append(pagelist, value, size); if (err) goto out; diff --git a/include/linux/ceph/pagelist.h b/include/linux/ceph/pagelist.h index d0223364349f..5dead8486fd8 100644 --- a/include/linux/ceph/pagelist.h +++ b/include/linux/ceph/pagelist.h @@ -23,16 +23,7 @@ struct ceph_pagelist_cursor { size_t room; /* room remaining to reset to */ }; -static inline void ceph_pagelist_init(struct ceph_pagelist *pl) -{ - INIT_LIST_HEAD(&pl->head); - pl->mapped_tail = NULL; - pl->length = 0; - pl->room = 0; - INIT_LIST_HEAD(&pl->free_list); - pl->num_pages_free = 0; - refcount_set(&pl->refcnt, 1); -} +struct ceph_pagelist *ceph_pagelist_alloc(gfp_t gfp_flags); extern void ceph_pagelist_release(struct ceph_pagelist *pl); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index a871b234cd90..db2ebc9e5f1f 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -776,12 +776,10 @@ int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0); - pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); + pagelist = ceph_pagelist_alloc(GFP_NOFS); if (!pagelist) return -ENOMEM; - ceph_pagelist_init(pagelist); - op->cls.class_name = class; size = strlen(class); BUG_ON(size > (size_t) U8_MAX); @@ -814,12 +812,10 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); - pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); + pagelist = ceph_pagelist_alloc(GFP_NOFS); if (!pagelist) return -ENOMEM; - ceph_pagelist_init(pagelist); - payload_len = strlen(name); op->xattr.name_len = payload_len; ceph_pagelist_append(pagelist, name, payload_len); @@ -4598,11 +4594,10 @@ static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which, op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0); - pl = kmalloc(sizeof(*pl), GFP_NOIO); + pl = ceph_pagelist_alloc(GFP_NOIO); if (!pl) return -ENOMEM; - ceph_pagelist_init(pl); ret = ceph_pagelist_encode_64(pl, notify_id); ret |= ceph_pagelist_encode_64(pl, cookie); if (payload) { @@ -4669,11 +4664,10 @@ static int osd_req_op_notify_init(struct ceph_osd_request *req, int which, op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0); op->notify.cookie = cookie; - pl = kmalloc(sizeof(*pl), GFP_NOIO); + pl = ceph_pagelist_alloc(GFP_NOIO); if (!pl) return -ENOMEM; - ceph_pagelist_init(pl); ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */ ret |= ceph_pagelist_encode_32(pl, timeout); ret |= ceph_pagelist_encode_32(pl, payload_len); diff --git a/net/ceph/pagelist.c b/net/ceph/pagelist.c index 2ea0564771d2..65e34f78b05d 100644 --- a/net/ceph/pagelist.c +++ b/net/ceph/pagelist.c @@ -6,6 +6,26 @@ #include #include +struct ceph_pagelist *ceph_pagelist_alloc(gfp_t gfp_flags) +{ + struct ceph_pagelist *pl; + + pl = kmalloc(sizeof(*pl), gfp_flags); + if (!pl) + return NULL; + + INIT_LIST_HEAD(&pl->head); + pl->mapped_tail = NULL; + pl->length = 0; + pl->room = 0; + INIT_LIST_HEAD(&pl->free_list); + pl->num_pages_free = 0; + refcount_set(&pl->refcnt, 1); + + return pl; +} +EXPORT_SYMBOL(ceph_pagelist_alloc); + static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl) { if (pl->mapped_tail) { From 894868330a1e038ea4a65dbb81741eef70ad71b1 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 28 Sep 2018 16:02:53 +0200 Subject: [PATCH 12/25] libceph: don't consume a ref on pagelist in ceph_msg_data_add_pagelist() Because send_mds_reconnect() wants to send a message with a pagelist and pass the ownership to the messenger, ceph_msg_data_add_pagelist() consumes a ref which is then put in ceph_msg_data_destroy(). This makes managing pagelists in the OSD client (where they are wrapped in ceph_osd_data) unnecessarily hard because the handoff only happens in ceph_osdc_start_request() instead of when the pagelist is passed to ceph_osd_data_pagelist_init(). I counted several memory leaks on various error paths. Fix up ceph_msg_data_add_pagelist() and carry a pagelist ref in ceph_osd_data. Signed-off-by: Ilya Dryomov --- fs/ceph/mds_client.c | 2 +- net/ceph/messenger.c | 1 + net/ceph/osd_client.c | 8 ++++++++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 580a79b9a91f..97de674ea377 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2136,7 +2136,6 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, if (req->r_pagelist) { struct ceph_pagelist *pagelist = req->r_pagelist; - refcount_inc(&pagelist->refcnt); ceph_msg_data_add_pagelist(msg, pagelist); msg->hdr.data_len = cpu_to_le32(pagelist->length); } else { @@ -3240,6 +3239,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, mutex_unlock(&mdsc->mutex); up_read(&mdsc->snap_rwsem); + ceph_pagelist_release(pagelist); return; fail: diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 0a187196aeed..76684edc43ef 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -3313,6 +3313,7 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg, data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST); BUG_ON(!data); + refcount_inc(&pagelist->refcnt); data->pagelist = pagelist; list_add_tail(&data->links, &msg->data); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index db2ebc9e5f1f..ad00495dbd39 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -126,6 +126,9 @@ static void ceph_osd_data_init(struct ceph_osd_data *osd_data) osd_data->type = CEPH_OSD_DATA_TYPE_NONE; } +/* + * Consumes @pages if @own_pages is true. + */ static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, struct page **pages, u64 length, u32 alignment, bool pages_from_pool, bool own_pages) @@ -138,6 +141,9 @@ static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, osd_data->own_pages = own_pages; } +/* + * Consumes a ref on @pagelist. + */ static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, struct ceph_pagelist *pagelist) { @@ -362,6 +368,8 @@ static void ceph_osd_data_release(struct ceph_osd_data *osd_data) num_pages = calc_pages_for((u64)osd_data->alignment, (u64)osd_data->length); ceph_release_page_vector(osd_data->pages, num_pages); + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { + ceph_pagelist_release(osd_data->pagelist); } ceph_osd_data_init(osd_data); } From 668028844174aa7069da1f8ea89a5dbc93e86216 Mon Sep 17 00:00:00 2001 From: Xuehan Xu Date: Thu, 11 Oct 2018 17:55:39 +0800 Subject: [PATCH 13/25] ceph: set timeout conditionally in __cap_delay_requeue __cap_delay_requeue could be invoked through ceph_check_caps when there exists caps that needs to be sent and are delayed by "i_hold_caps_min" or "i_hold_caps_max". If __cap_delay_requeue sets timeout unconditionally, there could be a chance that some "wanted" caps can not be release for a long since their timeouts are reset every time they get delayed. Fixes: http://tracker.ceph.com/issues/36369 Signed-off-by: Xuehan Xu Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/caps.c | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index b26ae1673992..f36946fdfb00 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -519,7 +519,8 @@ static void __cap_set_timeouts(struct ceph_mds_client *mdsc, * -> we take mdsc->cap_delay_lock */ static void __cap_delay_requeue(struct ceph_mds_client *mdsc, - struct ceph_inode_info *ci) + struct ceph_inode_info *ci, + bool set_timeout) { dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode, ci->i_ceph_flags, ci->i_hold_caps_max); @@ -530,7 +531,8 @@ static void __cap_delay_requeue(struct ceph_mds_client *mdsc, goto no_change; list_del_init(&ci->i_cap_delay_list); } - __cap_set_timeouts(mdsc, ci); + if (set_timeout) + __cap_set_timeouts(mdsc, ci); list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list); no_change: spin_unlock(&mdsc->cap_delay_lock); @@ -720,7 +722,7 @@ void ceph_add_cap(struct inode *inode, dout(" issued %s, mds wanted %s, actual %s, queueing\n", ceph_cap_string(issued), ceph_cap_string(wanted), ceph_cap_string(actual_wanted)); - __cap_delay_requeue(mdsc, ci); + __cap_delay_requeue(mdsc, ci, true); } if (flags & CEPH_CAP_FLAG_AUTH) { @@ -1647,7 +1649,7 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) && (mask & CEPH_CAP_FILE_BUFFER)) dirty |= I_DIRTY_DATASYNC; - __cap_delay_requeue(mdsc, ci); + __cap_delay_requeue(mdsc, ci, true); return dirty; } @@ -2065,7 +2067,7 @@ ack: /* Reschedule delayed caps release if we delayed anything */ if (delayed) - __cap_delay_requeue(mdsc, ci); + __cap_delay_requeue(mdsc, ci, false); spin_unlock(&ci->i_ceph_lock); @@ -2125,7 +2127,7 @@ retry: if (delayed) { spin_lock(&ci->i_ceph_lock); - __cap_delay_requeue(mdsc, ci); + __cap_delay_requeue(mdsc, ci, true); spin_unlock(&ci->i_ceph_lock); } } else { From 41a264e1b30ccde9ac83a314559ddacb78a85e91 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Sat, 13 Oct 2018 11:36:52 +0200 Subject: [PATCH 14/25] libceph: no need to call osd_req_opcode_valid() in osd_req_encode_op() Any uninitialized or unknown ops will be caught by the default clause anyway. Signed-off-by: Ilya Dryomov --- net/ceph/osd_client.c | 6 ------ 1 file changed, 6 deletions(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ad00495dbd39..f403a483d51d 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -903,12 +903,6 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg, static u32 osd_req_encode_op(struct ceph_osd_op *dst, const struct ceph_osd_req_op *src) { - if (WARN_ON(!osd_req_opcode_valid(src->op))) { - pr_err("unrecognized osd opcode %d\n", src->op); - - return 0; - } - switch (src->op) { case CEPH_OSD_OP_STAT: break; From 61d2f855042cfcce9b78fa10fe7cd2020598263b Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 11 Oct 2018 16:15:38 +0200 Subject: [PATCH 15/25] ceph: num_ops is off by one in ceph_aio_retry_work() Two OSD op slots are allocated, but only one is ever used. Signed-off-by: Ilya Dryomov --- fs/ceph/file.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 213e7d98248a..0265f9ae0ab9 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -858,7 +858,7 @@ static void ceph_aio_retry_work(struct work_struct *work) } spin_unlock(&ci->i_ceph_lock); - req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2, + req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 1, false, GFP_NOFS); if (!req) { ret = -ENOMEM; From 3b83f60da6dd1becd865c1e2745123a8ae378c25 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 11 Oct 2018 17:04:33 +0200 Subject: [PATCH 16/25] libceph: enable fallback to ceph_msg_new() in ceph_msgpool_get() ceph_msgpool_get() can fall back to ceph_msg_new() when it is asked for a message whose front portion is larger than pool->front_len. However the caller always passes 0, effectively disabling that code path. The allocation goes to the message pool and returns a message with a front that is smaller than requested, setting us up for a crash. One example of this is a directory with a large number of snapshots. If its snap context doesn't fit, we oops in encode_request_partial(). Signed-off-by: Ilya Dryomov --- net/ceph/msgpool.c | 2 +- net/ceph/osd_client.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c index 72571535883f..3dddc074f0d7 100644 --- a/net/ceph/msgpool.c +++ b/net/ceph/msgpool.c @@ -61,7 +61,7 @@ struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, if (front_len > pool->front_len) { dout("msgpool_get %s need front %d, pool size is %d\n", pool->name, front_len, pool->front_len); - WARN_ON(1); + WARN_ON_ONCE(1); /* try to alloc a fresh message */ return ceph_msg_new(pool->type, front_len, GFP_NOFS, false); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index f403a483d51d..35bc77c8c230 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -641,7 +641,7 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) msg_size += 4 + 8; /* retry_attempt, features */ if (req->r_mempool) - msg = ceph_msgpool_get(&osdc->msgpool_op, 0); + msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size); else msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true); if (!msg) @@ -656,7 +656,7 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) msg_size += req->r_num_ops * sizeof(struct ceph_osd_op); if (req->r_mempool) - msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); + msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size); else msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true); if (!msg) From 81c65213d73cc51f2f2e4326d602e107c07afb05 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 11 Oct 2018 12:58:33 +0200 Subject: [PATCH 17/25] libceph: assign cookies in linger_submit() Register lingers directly in linger_submit(). This avoids allocating memory for notify pagelist while holding osdc->lock and simplifies both callers of linger_submit(). Signed-off-by: Ilya Dryomov --- net/ceph/osd_client.c | 48 ++++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 35bc77c8c230..17d1d2a04a7a 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2998,11 +2998,21 @@ static void linger_submit(struct ceph_osd_linger_request *lreq) struct ceph_osd_client *osdc = lreq->osdc; struct ceph_osd *osd; + down_write(&osdc->lock); + linger_register(lreq); + if (lreq->is_watch) { + lreq->reg_req->r_ops[0].watch.cookie = lreq->linger_id; + lreq->ping_req->r_ops[0].watch.cookie = lreq->linger_id; + } else { + lreq->reg_req->r_ops[0].notify.cookie = lreq->linger_id; + } + calc_target(osdc, &lreq->t, NULL, false); osd = lookup_create_osd(osdc, lreq->t.osd, true); link_linger(osd, lreq); send_linger(lreq); + up_write(&osdc->lock); } static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq) @@ -4523,15 +4533,14 @@ ceph_osdc_watch(struct ceph_osd_client *osdc, goto err_put_lreq; } - down_write(&osdc->lock); - linger_register(lreq); /* before osd_req_op_* */ - osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id, - CEPH_OSD_WATCH_OP_WATCH); - osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id, - CEPH_OSD_WATCH_OP_PING); - linger_submit(lreq); - up_write(&osdc->lock); + /* + * Pass 0 for cookie because we don't know it yet, it will be + * filled in by linger_submit(). + */ + osd_req_op_watch_init(lreq->reg_req, 0, 0, CEPH_OSD_WATCH_OP_WATCH); + osd_req_op_watch_init(lreq->ping_req, 0, 0, CEPH_OSD_WATCH_OP_PING); + linger_submit(lreq); ret = linger_reg_commit_wait(lreq); if (ret) { linger_cancel(lreq); @@ -4728,29 +4737,26 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc, goto out_put_lreq; } + /* + * Pass 0 for cookie because we don't know it yet, it will be + * filled in by linger_submit(). + */ + ret = osd_req_op_notify_init(lreq->reg_req, 0, 0, 1, timeout, + payload, payload_len); + if (ret) + goto out_put_lreq; + /* for notify_id */ pages = ceph_alloc_page_vector(1, GFP_NOIO); if (IS_ERR(pages)) { ret = PTR_ERR(pages); goto out_put_lreq; } - - down_write(&osdc->lock); - linger_register(lreq); /* before osd_req_op_* */ - ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1, - timeout, payload, payload_len); - if (ret) { - linger_unregister(lreq); - up_write(&osdc->lock); - ceph_release_page_vector(pages, 1); - goto out_put_lreq; - } ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify, response_data), pages, PAGE_SIZE, 0, false, true); - linger_submit(lreq); - up_write(&osdc->lock); + linger_submit(lreq); ret = linger_reg_commit_wait(lreq); if (!ret) ret = linger_notify_finish_wait(lreq); From 39e58c3425b1816c84724e8a77f9f9daa0143263 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 15 Oct 2018 15:26:27 +0200 Subject: [PATCH 18/25] libceph: introduce alloc_watch_request() ceph_osdc_alloc_messages() call will be moved out of alloc_linger_request() in the next commit, which means that ceph_osdc_watch() will need to call ceph_osdc_alloc_messages() twice. Add a helper for that. Signed-off-by: Ilya Dryomov --- net/ceph/osd_client.c | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 17d1d2a04a7a..a5fbb38086b6 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -4492,6 +4492,23 @@ alloc_linger_request(struct ceph_osd_linger_request *lreq) return req; } +static struct ceph_osd_request * +alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode) +{ + struct ceph_osd_request *req; + + req = alloc_linger_request(lreq); + if (!req) + return NULL; + + /* + * Pass 0 for cookie because we don't know it yet, it will be + * filled in by linger_submit(). + */ + osd_req_op_watch_init(req, 0, 0, watch_opcode); + return req; +} + /* * Returns a handle, caller owns a ref. */ @@ -4521,25 +4538,18 @@ ceph_osdc_watch(struct ceph_osd_client *osdc, lreq->t.flags = CEPH_OSD_FLAG_WRITE; ktime_get_real_ts64(&lreq->mtime); - lreq->reg_req = alloc_linger_request(lreq); + lreq->reg_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_WATCH); if (!lreq->reg_req) { ret = -ENOMEM; goto err_put_lreq; } - lreq->ping_req = alloc_linger_request(lreq); + lreq->ping_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_PING); if (!lreq->ping_req) { ret = -ENOMEM; goto err_put_lreq; } - /* - * Pass 0 for cookie because we don't know it yet, it will be - * filled in by linger_submit(). - */ - osd_req_op_watch_init(lreq->reg_req, 0, 0, CEPH_OSD_WATCH_OP_WATCH); - osd_req_op_watch_init(lreq->ping_req, 0, 0, CEPH_OSD_WATCH_OP_PING); - linger_submit(lreq); ret = linger_reg_commit_wait(lreq); if (ret) { From 26f887e0a3c43f67b550e2e5d8a76e86ca11d188 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 15 Oct 2018 16:11:37 +0200 Subject: [PATCH 19/25] libceph, rbd, ceph: move ceph_osdc_alloc_messages() calls The current requirement is that ceph_osdc_alloc_messages() should be called after oid and oloc are known. In preparation for preallocating message data items, move ceph_osdc_alloc_messages() further down, so that it is called when OSD op codes are known. Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 19 ++++++++++++------- fs/ceph/file.c | 10 +++++----- net/ceph/osd_client.c | 38 +++++++++++++++++++++----------------- 3 files changed, 38 insertions(+), 29 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 9cc7ee3b427f..8e5140bbf241 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1500,9 +1500,6 @@ rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops) rbd_dev->header.object_prefix, obj_req->ex.oe_objno)) goto err_req; - if (ceph_osdc_alloc_messages(req, GFP_NOIO)) - goto err_req; - return req; err_req: @@ -1945,6 +1942,10 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req) } if (ret) return ret; + + ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); + if (ret) + return ret; } return 0; @@ -2404,6 +2405,10 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) rbd_assert(0); } + ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); + if (ret) + return ret; + rbd_obj_request_submit(obj_req); return 0; } @@ -3783,10 +3788,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, ceph_oloc_copy(&req->r_base_oloc, oloc); req->r_flags = CEPH_OSD_FLAG_READ; - ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); - if (ret) - goto out_req; - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); if (IS_ERR(pages)) { ret = PTR_ERR(pages); @@ -3797,6 +3798,10 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, true); + ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); + if (ret) + goto out_req; + ceph_osdc_start_request(osdc, req, false); ret = ceph_osdc_wait_request(osdc, req); if (ret >= 0) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 0265f9ae0ab9..0fa6b6b6ccbc 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -870,6 +870,11 @@ static void ceph_aio_retry_work(struct work_struct *work) ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc); ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid); + req->r_ops[0] = orig_req->r_ops[0]; + + req->r_mtime = aio_req->mtime; + req->r_data_offset = req->r_ops[0].extent.offset; + ret = ceph_osdc_alloc_messages(req, GFP_NOFS); if (ret) { ceph_osdc_put_request(req); @@ -877,11 +882,6 @@ static void ceph_aio_retry_work(struct work_struct *work) goto out; } - req->r_ops[0] = orig_req->r_ops[0]; - - req->r_mtime = aio_req->mtime; - req->r_data_offset = req->r_ops[0].extent.offset; - ceph_osdc_put_request(orig_req); req->r_callback = ceph_aio_complete_req; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index a5fbb38086b6..7ac7f21ff317 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -4483,12 +4483,6 @@ alloc_linger_request(struct ceph_osd_linger_request *lreq) ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); - - if (ceph_osdc_alloc_messages(req, GFP_NOIO)) { - ceph_osdc_put_request(req); - return NULL; - } - return req; } @@ -4506,6 +4500,12 @@ alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode) * filled in by linger_submit(). */ osd_req_op_watch_init(req, 0, 0, watch_opcode); + + if (ceph_osdc_alloc_messages(req, GFP_NOIO)) { + ceph_osdc_put_request(req); + return NULL; + } + return req; } @@ -4656,12 +4656,12 @@ int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, ceph_oloc_copy(&req->r_base_oloc, oloc); req->r_flags = CEPH_OSD_FLAG_READ; - ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, + payload_len); if (ret) goto out_put_req; - ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, - payload_len); + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); if (ret) goto out_put_req; @@ -4766,6 +4766,10 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc, response_data), pages, PAGE_SIZE, 0, false, true); + ret = ceph_osdc_alloc_messages(lreq->reg_req, GFP_NOIO); + if (ret) + goto out_put_lreq; + linger_submit(lreq); ret = linger_reg_commit_wait(lreq); if (!ret) @@ -4892,10 +4896,6 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, ceph_oloc_copy(&req->r_base_oloc, oloc); req->r_flags = CEPH_OSD_FLAG_READ; - ret = ceph_osdc_alloc_messages(req, GFP_NOIO); - if (ret) - goto out_put_req; - pages = ceph_alloc_page_vector(1, GFP_NOIO); if (IS_ERR(pages)) { ret = PTR_ERR(pages); @@ -4907,6 +4907,10 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, response_data), pages, PAGE_SIZE, 0, false, true); + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + ceph_osdc_start_request(osdc, req, false); ret = ceph_osdc_wait_request(osdc, req); if (ret >= 0) { @@ -4969,10 +4973,6 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, ceph_oloc_copy(&req->r_base_oloc, oloc); req->r_flags = flags; - ret = ceph_osdc_alloc_messages(req, GFP_NOIO); - if (ret) - goto out_put_req; - ret = osd_req_op_cls_init(req, 0, class, method); if (ret) goto out_put_req; @@ -4984,6 +4984,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, osd_req_op_cls_response_data_pages(req, 0, &resp_page, *resp_len, 0, false, false); + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + ceph_osdc_start_request(osdc, req, false); ret = ceph_osdc_wait_request(osdc, req); if (ret >= 0) { From 0d9c1ab3be4c0187663096a6a084421d0a1e45c6 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 15 Oct 2018 17:38:23 +0200 Subject: [PATCH 20/25] libceph: preallocate message data items Currently message data items are allocated with ceph_msg_data_create() in setup_request_data() inside send_request(). send_request() has never been allowed to fail, so each allocation is followed by a BUG_ON: data = ceph_msg_data_create(...); BUG_ON(!data); It's been this way since support for multiple message data items was added in commit 6644ed7b7e04 ("libceph: make message data be a pointer") in 3.10. There is no reason to delay the allocation of message data items until the last possible moment and we certainly don't need a linked list of them as they are only ever appended to the end and never erased. Make ceph_msg_new2() take max_data_items and adapt the rest of the code. Reported-by: Jerry Lee Signed-off-by: Ilya Dryomov --- fs/ceph/mds_client.c | 4 +- include/linux/ceph/messenger.h | 24 ++------ include/linux/ceph/msgpool.h | 11 ++-- net/ceph/messenger.c | 106 ++++++++++++--------------------- net/ceph/msgpool.c | 25 +++++--- net/ceph/osd_client.c | 102 ++++++++++++++++++++++++++----- 6 files changed, 157 insertions(+), 115 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 97de674ea377..67a9aeb2f4ec 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2071,7 +2071,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, if (req->r_old_dentry_drop) len += req->r_old_dentry->d_name.len; - msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false); + msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); if (!msg) { msg = ERR_PTR(-ENOMEM); goto out_free2; @@ -3129,7 +3129,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, if (!pagelist) goto fail_nopagelist; - reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false); + reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); if (!reply) goto fail_nomsg; diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index fc2b4491ee0a..800a2128d411 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -82,22 +82,6 @@ enum ceph_msg_data_type { CEPH_MSG_DATA_BVECS, /* data source/destination is a bio_vec array */ }; -static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type) -{ - switch (type) { - case CEPH_MSG_DATA_NONE: - case CEPH_MSG_DATA_PAGES: - case CEPH_MSG_DATA_PAGELIST: -#ifdef CONFIG_BLOCK - case CEPH_MSG_DATA_BIO: -#endif /* CONFIG_BLOCK */ - case CEPH_MSG_DATA_BVECS: - return true; - default: - return false; - } -} - #ifdef CONFIG_BLOCK struct ceph_bio_iter { @@ -181,7 +165,6 @@ struct ceph_bvec_iter { } while (0) struct ceph_msg_data { - struct list_head links; /* ceph_msg->data */ enum ceph_msg_data_type type; union { #ifdef CONFIG_BLOCK @@ -202,7 +185,6 @@ struct ceph_msg_data { struct ceph_msg_data_cursor { size_t total_resid; /* across all data items */ - struct list_head *data_head; /* = &ceph_msg->data */ struct ceph_msg_data *data; /* current data item */ size_t resid; /* bytes not yet consumed */ @@ -240,7 +222,9 @@ struct ceph_msg { struct ceph_buffer *middle; size_t data_length; - struct list_head data; + struct ceph_msg_data *data; + int num_data_items; + int max_data_items; struct ceph_msg_data_cursor cursor; struct ceph_connection *con; @@ -381,6 +365,8 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos, void ceph_msg_data_add_bvecs(struct ceph_msg *msg, struct ceph_bvec_iter *bvec_pos); +struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items, + gfp_t flags, bool can_fail); extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, bool can_fail); diff --git a/include/linux/ceph/msgpool.h b/include/linux/ceph/msgpool.h index 76c98a512758..729cdf700eae 100644 --- a/include/linux/ceph/msgpool.h +++ b/include/linux/ceph/msgpool.h @@ -13,14 +13,15 @@ struct ceph_msgpool { mempool_t *pool; int type; /* preallocated message type */ int front_len; /* preallocated payload size */ + int max_data_items; }; -extern int ceph_msgpool_init(struct ceph_msgpool *pool, int type, - int front_len, int size, bool blocking, - const char *name); +int ceph_msgpool_init(struct ceph_msgpool *pool, int type, + int front_len, int max_data_items, int size, + const char *name); extern void ceph_msgpool_destroy(struct ceph_msgpool *pool); -extern struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *, - int front_len); +struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len, + int max_data_items); extern void ceph_msgpool_put(struct ceph_msgpool *, struct ceph_msg *); #endif diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 76684edc43ef..88e35830198c 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -156,7 +156,6 @@ static bool con_flag_test_and_set(struct ceph_connection *con, /* Slab caches for frequently-allocated structures */ static struct kmem_cache *ceph_msg_cache; -static struct kmem_cache *ceph_msg_data_cache; /* static tag bytes (protocol control messages) */ static char tag_msg = CEPH_MSGR_TAG_MSG; @@ -235,23 +234,11 @@ static int ceph_msgr_slab_init(void) if (!ceph_msg_cache) return -ENOMEM; - BUG_ON(ceph_msg_data_cache); - ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0); - if (ceph_msg_data_cache) - return 0; - - kmem_cache_destroy(ceph_msg_cache); - ceph_msg_cache = NULL; - - return -ENOMEM; + return 0; } static void ceph_msgr_slab_exit(void) { - BUG_ON(!ceph_msg_data_cache); - kmem_cache_destroy(ceph_msg_data_cache); - ceph_msg_data_cache = NULL; - BUG_ON(!ceph_msg_cache); kmem_cache_destroy(ceph_msg_cache); ceph_msg_cache = NULL; @@ -1141,16 +1128,13 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor) static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length) { struct ceph_msg_data_cursor *cursor = &msg->cursor; - struct ceph_msg_data *data; BUG_ON(!length); BUG_ON(length > msg->data_length); - BUG_ON(list_empty(&msg->data)); + BUG_ON(!msg->num_data_items); - cursor->data_head = &msg->data; cursor->total_resid = length; - data = list_first_entry(&msg->data, struct ceph_msg_data, links); - cursor->data = data; + cursor->data = msg->data; __ceph_msg_data_cursor_init(cursor); } @@ -1231,8 +1215,7 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, if (!cursor->resid && cursor->total_resid) { WARN_ON(!cursor->last_piece); - BUG_ON(list_is_last(&cursor->data->links, cursor->data_head)); - cursor->data = list_next_entry(cursor->data, links); + cursor->data++; __ceph_msg_data_cursor_init(cursor); new_piece = true; } @@ -1248,9 +1231,6 @@ static size_t sizeof_footer(struct ceph_connection *con) static void prepare_message_data(struct ceph_msg *msg, u32 data_len) { - BUG_ON(!msg); - BUG_ON(!data_len); - /* Initialize data cursor */ ceph_msg_data_cursor_init(msg, (size_t)data_len); @@ -1590,7 +1570,7 @@ static int write_partial_message_data(struct ceph_connection *con) dout("%s %p msg %p\n", __func__, con, msg); - if (list_empty(&msg->data)) + if (!msg->num_data_items) return -EINVAL; /* @@ -2347,8 +2327,7 @@ static int read_partial_msg_data(struct ceph_connection *con) u32 crc = 0; int ret; - BUG_ON(!msg); - if (list_empty(&msg->data)) + if (!msg->num_data_items) return -EIO; if (do_datacrc) @@ -3256,32 +3235,16 @@ bool ceph_con_keepalive_expired(struct ceph_connection *con, return false; } -static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) +static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg) { - struct ceph_msg_data *data; - - if (WARN_ON(!ceph_msg_data_type_valid(type))) - return NULL; - - data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS); - if (!data) - return NULL; - - data->type = type; - INIT_LIST_HEAD(&data->links); - - return data; + BUG_ON(msg->num_data_items >= msg->max_data_items); + return &msg->data[msg->num_data_items++]; } static void ceph_msg_data_destroy(struct ceph_msg_data *data) { - if (!data) - return; - - WARN_ON(!list_empty(&data->links)); if (data->type == CEPH_MSG_DATA_PAGELIST) ceph_pagelist_release(data->pagelist); - kmem_cache_free(ceph_msg_data_cache, data); } void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, @@ -3292,13 +3255,12 @@ void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, BUG_ON(!pages); BUG_ON(!length); - data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES); - BUG_ON(!data); + data = ceph_msg_data_add(msg); + data->type = CEPH_MSG_DATA_PAGES; data->pages = pages; data->length = length; data->alignment = alignment & ~PAGE_MASK; - list_add_tail(&data->links, &msg->data); msg->data_length += length; } EXPORT_SYMBOL(ceph_msg_data_add_pages); @@ -3311,12 +3273,11 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg, BUG_ON(!pagelist); BUG_ON(!pagelist->length); - data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST); - BUG_ON(!data); + data = ceph_msg_data_add(msg); + data->type = CEPH_MSG_DATA_PAGELIST; refcount_inc(&pagelist->refcnt); data->pagelist = pagelist; - list_add_tail(&data->links, &msg->data); msg->data_length += pagelist->length; } EXPORT_SYMBOL(ceph_msg_data_add_pagelist); @@ -3327,12 +3288,11 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos, { struct ceph_msg_data *data; - data = ceph_msg_data_create(CEPH_MSG_DATA_BIO); - BUG_ON(!data); + data = ceph_msg_data_add(msg); + data->type = CEPH_MSG_DATA_BIO; data->bio_pos = *bio_pos; data->bio_length = length; - list_add_tail(&data->links, &msg->data); msg->data_length += length; } EXPORT_SYMBOL(ceph_msg_data_add_bio); @@ -3343,11 +3303,10 @@ void ceph_msg_data_add_bvecs(struct ceph_msg *msg, { struct ceph_msg_data *data; - data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS); - BUG_ON(!data); + data = ceph_msg_data_add(msg); + data->type = CEPH_MSG_DATA_BVECS; data->bvec_pos = *bvec_pos; - list_add_tail(&data->links, &msg->data); msg->data_length += bvec_pos->iter.bi_size; } EXPORT_SYMBOL(ceph_msg_data_add_bvecs); @@ -3356,8 +3315,8 @@ EXPORT_SYMBOL(ceph_msg_data_add_bvecs); * construct a new message with given type, size * the new msg has a ref count of 1. */ -struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, - bool can_fail) +struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items, + gfp_t flags, bool can_fail) { struct ceph_msg *m; @@ -3371,7 +3330,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, INIT_LIST_HEAD(&m->list_head); kref_init(&m->kref); - INIT_LIST_HEAD(&m->data); /* front */ if (front_len) { @@ -3386,6 +3344,15 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, } m->front_alloc_len = m->front.iov_len = front_len; + if (max_data_items) { + m->data = kmalloc_array(max_data_items, sizeof(*m->data), + flags); + if (!m->data) + goto out2; + + m->max_data_items = max_data_items; + } + dout("ceph_msg_new %p front %d\n", m, front_len); return m; @@ -3402,6 +3369,13 @@ out: } return NULL; } +EXPORT_SYMBOL(ceph_msg_new2); + +struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, + bool can_fail) +{ + return ceph_msg_new2(type, front_len, 0, flags, can_fail); +} EXPORT_SYMBOL(ceph_msg_new); /* @@ -3497,13 +3471,14 @@ static void ceph_msg_free(struct ceph_msg *m) { dout("%s %p\n", __func__, m); kvfree(m->front.iov_base); + kfree(m->data); kmem_cache_free(ceph_msg_cache, m); } static void ceph_msg_release(struct kref *kref) { struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); - struct ceph_msg_data *data, *next; + int i; dout("%s %p\n", __func__, m); WARN_ON(!list_empty(&m->list_head)); @@ -3516,11 +3491,8 @@ static void ceph_msg_release(struct kref *kref) m->middle = NULL; } - list_for_each_entry_safe(data, next, &m->data, links) { - list_del_init(&data->links); - ceph_msg_data_destroy(data); - } - m->data_length = 0; + for (i = 0; i < m->num_data_items; i++) + ceph_msg_data_destroy(&m->data[i]); if (m->pool) ceph_msgpool_put(m->pool, m); diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c index 3dddc074f0d7..e3ecb80cd182 100644 --- a/net/ceph/msgpool.c +++ b/net/ceph/msgpool.c @@ -14,7 +14,8 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg) struct ceph_msgpool *pool = arg; struct ceph_msg *msg; - msg = ceph_msg_new(pool->type, pool->front_len, gfp_mask, true); + msg = ceph_msg_new2(pool->type, pool->front_len, pool->max_data_items, + gfp_mask, true); if (!msg) { dout("msgpool_alloc %s failed\n", pool->name); } else { @@ -35,11 +36,13 @@ static void msgpool_free(void *element, void *arg) } int ceph_msgpool_init(struct ceph_msgpool *pool, int type, - int front_len, int size, bool blocking, const char *name) + int front_len, int max_data_items, int size, + const char *name) { dout("msgpool %s init\n", name); pool->type = type; pool->front_len = front_len; + pool->max_data_items = max_data_items; pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool); if (!pool->pool) return -ENOMEM; @@ -53,18 +56,21 @@ void ceph_msgpool_destroy(struct ceph_msgpool *pool) mempool_destroy(pool->pool); } -struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, - int front_len) +struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len, + int max_data_items) { struct ceph_msg *msg; - if (front_len > pool->front_len) { - dout("msgpool_get %s need front %d, pool size is %d\n", - pool->name, front_len, pool->front_len); + if (front_len > pool->front_len || + max_data_items > pool->max_data_items) { + pr_warn_ratelimited("%s need %d/%d, pool %s has %d/%d\n", + __func__, front_len, max_data_items, pool->name, + pool->front_len, pool->max_data_items); WARN_ON_ONCE(1); /* try to alloc a fresh message */ - return ceph_msg_new(pool->type, front_len, GFP_NOFS, false); + return ceph_msg_new2(pool->type, front_len, max_data_items, + GFP_NOFS, false); } msg = mempool_alloc(pool->pool, GFP_NOFS); @@ -80,6 +86,9 @@ void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg) msg->front.iov_len = pool->front_len; msg->hdr.front_len = cpu_to_le32(pool->front_len); + msg->data_length = 0; + msg->num_data_items = 0; + kref_init(&msg->kref); /* retake single ref */ mempool_free(msg, pool->pool); } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 7ac7f21ff317..cf0bd2cce848 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -614,12 +614,15 @@ static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc) return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0); } -int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) +static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp, + int num_request_data_items, + int num_reply_data_items) { struct ceph_osd_client *osdc = req->r_osdc; struct ceph_msg *msg; int msg_size; + WARN_ON(req->r_request || req->r_reply); WARN_ON(ceph_oid_empty(&req->r_base_oid)); WARN_ON(ceph_oloc_empty(&req->r_base_oloc)); @@ -641,9 +644,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) msg_size += 4 + 8; /* retry_attempt, features */ if (req->r_mempool) - msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size); + msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size, + num_request_data_items); else - msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true); + msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size, + num_request_data_items, gfp, true); if (!msg) return -ENOMEM; @@ -656,9 +661,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) msg_size += req->r_num_ops * sizeof(struct ceph_osd_op); if (req->r_mempool) - msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size); + msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size, + num_reply_data_items); else - msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true); + msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size, + num_reply_data_items, gfp, true); if (!msg) return -ENOMEM; @@ -666,7 +673,6 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) return 0; } -EXPORT_SYMBOL(ceph_osdc_alloc_messages); static bool osd_req_opcode_valid(u16 opcode) { @@ -679,6 +685,64 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE) } } +static void get_num_data_items(struct ceph_osd_request *req, + int *num_request_data_items, + int *num_reply_data_items) +{ + struct ceph_osd_req_op *op; + + *num_request_data_items = 0; + *num_reply_data_items = 0; + + for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) { + switch (op->op) { + /* request */ + case CEPH_OSD_OP_WRITE: + case CEPH_OSD_OP_WRITEFULL: + case CEPH_OSD_OP_SETXATTR: + case CEPH_OSD_OP_CMPXATTR: + case CEPH_OSD_OP_NOTIFY_ACK: + *num_request_data_items += 1; + break; + + /* reply */ + case CEPH_OSD_OP_STAT: + case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_LIST_WATCHERS: + *num_reply_data_items += 1; + break; + + /* both */ + case CEPH_OSD_OP_NOTIFY: + *num_request_data_items += 1; + *num_reply_data_items += 1; + break; + case CEPH_OSD_OP_CALL: + *num_request_data_items += 2; + *num_reply_data_items += 1; + break; + + default: + WARN_ON(!osd_req_opcode_valid(op->op)); + break; + } + } +} + +/* + * oid, oloc and OSD op opcode(s) must be filled in before this function + * is called. + */ +int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) +{ + int num_request_data_items, num_reply_data_items; + + get_num_data_items(req, &num_request_data_items, &num_reply_data_items); + return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items, + num_reply_data_items); +} +EXPORT_SYMBOL(ceph_osdc_alloc_messages); + /* * This is an osd op init function for opcodes that have no data or * other information associated with them. It also serves as a @@ -1035,7 +1099,15 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (flags & CEPH_OSD_FLAG_WRITE) req->r_data_offset = off; - r = ceph_osdc_alloc_messages(req, GFP_NOFS); + if (num_ops > 1) + /* + * This is a special case for ceph_writepages_start(), but it + * also covers ceph_uninline_data(). If more multi-op request + * use cases emerge, we will need a separate helper. + */ + r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0); + else + r = ceph_osdc_alloc_messages(req, GFP_NOFS); if (r) goto fail; @@ -1842,13 +1914,16 @@ static bool should_plug_request(struct ceph_osd_request *req) return true; } +/* + * Keep get_num_data_items() in sync with this function. + */ static void setup_request_data(struct ceph_osd_request *req, struct ceph_msg *msg) { u32 data_len = 0; int i; - if (!list_empty(&msg->data)) + if (msg->num_data_items) return; WARN_ON(msg->data_length); @@ -4325,9 +4400,7 @@ static void handle_watch_notify(struct ceph_osd_client *osdc, lreq->notify_id, notify_id); } else if (!completion_done(&lreq->notify_finish_wait)) { struct ceph_msg_data *data = - list_first_entry_or_null(&msg->data, - struct ceph_msg_data, - links); + msg->num_data_items ? &msg->data[0] : NULL; if (data) { if (lreq->preply_pages) { @@ -5036,11 +5109,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) goto out_map; err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, - PAGE_SIZE, 10, true, "osd_op"); + PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op"); if (err < 0) goto out_mempool; err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, - PAGE_SIZE, 10, true, "osd_op_reply"); + PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, + "osd_op_reply"); if (err < 0) goto out_msgpool; @@ -5310,7 +5384,7 @@ static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr) u32 front_len = le32_to_cpu(hdr->front_len); u32 data_len = le32_to_cpu(hdr->data_len); - m = ceph_msg_new(type, front_len, GFP_NOIO, false); + m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false); if (!m) return NULL; From 98c4bfe9d89b22d7bfddf6469241658920b6fafe Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 17 Oct 2018 14:23:04 +0200 Subject: [PATCH 21/25] libceph: check reply num_data_items in setup_request_data() setup_request_data() adds message data items to both request and reply messages, but only checks request num_data_items before proceeding with the loop. This is wrong because if an op doesn't have any request data items but has a reply data item (e.g. read), a duplicate data item gets added to the message on every resend attempt. This went unnoticed for years but now that message data items are preallocated, it promptly crashes in ceph_msg_data_add(). Amend the signature to make it clear that setup_request_data() operates on both request and reply messages. Also, remove data_len assert -- we have another one in prepare_write_message(). Signed-off-by: Ilya Dryomov --- net/ceph/osd_client.c | 48 +++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index cf0bd2cce848..a0148de51cc6 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1917,48 +1917,48 @@ static bool should_plug_request(struct ceph_osd_request *req) /* * Keep get_num_data_items() in sync with this function. */ -static void setup_request_data(struct ceph_osd_request *req, - struct ceph_msg *msg) +static void setup_request_data(struct ceph_osd_request *req) { - u32 data_len = 0; - int i; + struct ceph_msg *request_msg = req->r_request; + struct ceph_msg *reply_msg = req->r_reply; + struct ceph_osd_req_op *op; - if (msg->num_data_items) + if (req->r_request->num_data_items || req->r_reply->num_data_items) return; - WARN_ON(msg->data_length); - for (i = 0; i < req->r_num_ops; i++) { - struct ceph_osd_req_op *op = &req->r_ops[i]; - + WARN_ON(request_msg->data_length || reply_msg->data_length); + for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) { switch (op->op) { /* request */ case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: WARN_ON(op->indata_len != op->extent.length); - ceph_osdc_msg_data_add(msg, &op->extent.osd_data); + ceph_osdc_msg_data_add(request_msg, + &op->extent.osd_data); break; case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_CMPXATTR: WARN_ON(op->indata_len != op->xattr.name_len + op->xattr.value_len); - ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); + ceph_osdc_msg_data_add(request_msg, + &op->xattr.osd_data); break; case CEPH_OSD_OP_NOTIFY_ACK: - ceph_osdc_msg_data_add(msg, + ceph_osdc_msg_data_add(request_msg, &op->notify_ack.request_data); break; /* reply */ case CEPH_OSD_OP_STAT: - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->raw_data_in); break; case CEPH_OSD_OP_READ: - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->extent.osd_data); break; case CEPH_OSD_OP_LIST_WATCHERS: - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->list_watchers.response_data); break; @@ -1967,25 +1967,23 @@ static void setup_request_data(struct ceph_osd_request *req, WARN_ON(op->indata_len != op->cls.class_len + op->cls.method_len + op->cls.indata_len); - ceph_osdc_msg_data_add(msg, &op->cls.request_info); + ceph_osdc_msg_data_add(request_msg, + &op->cls.request_info); /* optional, can be NONE */ - ceph_osdc_msg_data_add(msg, &op->cls.request_data); + ceph_osdc_msg_data_add(request_msg, + &op->cls.request_data); /* optional, can be NONE */ - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->cls.response_data); break; case CEPH_OSD_OP_NOTIFY: - ceph_osdc_msg_data_add(msg, + ceph_osdc_msg_data_add(request_msg, &op->notify.request_data); - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->notify.response_data); break; } - - data_len += op->indata_len; } - - WARN_ON(data_len != msg->data_length); } static void encode_pgid(void **p, const struct ceph_pg *pgid) @@ -2033,7 +2031,7 @@ static void encode_request_partial(struct ceph_osd_request *req, req->r_data_offset || req->r_snapc); } - setup_request_data(req, msg); + setup_request_data(req); encode_spgid(&p, &req->r_t.spgid); /* actual spg */ ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */ From 2ee9dd958d474252510b8c4dc216aa1dab7ad272 Mon Sep 17 00:00:00 2001 From: Luis Henriques Date: Mon, 15 Oct 2018 16:45:57 +0100 Subject: [PATCH 22/25] ceph: add non-blocking parameter to ceph_try_get_caps() ceph_try_get_caps currently calls try_get_cap_refs with the nonblock parameter always set to 'true'. This change adds a new parameter that allows to set it's value. This will be useful for a follow-up patch that will need to get two sets of capabilities for two different inodes without risking a deadlock. Signed-off-by: Luis Henriques Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/addr.c | 2 +- fs/ceph/caps.c | 7 ++++--- fs/ceph/super.h | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 9c332a6f6667..8eade7a993c1 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -322,7 +322,7 @@ static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx, /* caller of readpages does not hold buffer and read caps * (fadvise, madvise and readahead cases) */ int want = CEPH_CAP_FILE_CACHE; - ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got); + ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, true, &got); if (ret < 0) { dout("start_read %p, error getting cap\n", inode); } else if (!(got & want)) { diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index f36946fdfb00..f3496db4bb3e 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2673,17 +2673,18 @@ static void check_max_size(struct inode *inode, loff_t endoff) ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); } -int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got) +int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, + bool nonblock, int *got) { int ret, err = 0; BUG_ON(need & ~CEPH_CAP_FILE_RD); - BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); + BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO|CEPH_CAP_FILE_SHARED)); ret = ceph_pool_perm_check(ci, need); if (ret < 0) return ret; - ret = try_get_cap_refs(ci, need, want, 0, true, got, &err); + ret = try_get_cap_refs(ci, need, want, 0, nonblock, got, &err); if (ret) { if (err == -EAGAIN) { ret = 0; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 582e28fd1b7b..91b13400badd 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -1008,7 +1008,7 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn, extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, loff_t endoff, int *got, struct page **pinned_page); extern int ceph_try_get_caps(struct ceph_inode_info *ci, - int need, int want, int *got); + int need, int want, bool nonblock, int *got); /* for counting open files by mode */ extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode); From 23ddf9bea900b4ce39e5707e1ddf66062cfe6d91 Mon Sep 17 00:00:00 2001 From: Luis Henriques Date: Mon, 15 Oct 2018 16:45:58 +0100 Subject: [PATCH 23/25] libceph: support the RADOS copy-from operation Add support for performing remote object copies using the 'copy-from' operation. [ Add COPY_FROM to get_num_data_items(). ] Signed-off-by: Luis Henriques Reviewed-by: Ilya Dryomov Signed-off-by: Ilya Dryomov --- include/linux/ceph/osd_client.h | 17 +++++++ include/linux/ceph/rados.h | 28 ++++++++++ net/ceph/osd_client.c | 90 +++++++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+) diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index f3e828460c91..7a2af5034278 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -136,6 +136,13 @@ struct ceph_osd_req_op { u64 expected_object_size; u64 expected_write_size; } alloc_hint; + struct { + u64 snapid; + u64 src_version; + u8 flags; + u32 src_fadvise_flags; + struct ceph_osd_data osd_data; + } copy_from; }; }; @@ -510,6 +517,16 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct timespec64 *mtime, struct page **pages, int nr_pages); +int ceph_osdc_copy_from(struct ceph_osd_client *osdc, + u64 src_snapid, u64 src_version, + struct ceph_object_id *src_oid, + struct ceph_object_locator *src_oloc, + u32 src_fadvise_flags, + struct ceph_object_id *dst_oid, + struct ceph_object_locator *dst_oloc, + u32 dst_fadvise_flags, + u8 copy_from_flags); + /* watch/notify */ struct ceph_osd_linger_request * ceph_osdc_watch(struct ceph_osd_client *osdc, diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index f1988387c5ad..3eb0e55665b4 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -410,6 +410,14 @@ enum { enum { CEPH_OSD_OP_FLAG_EXCL = 1, /* EXCL object create */ CEPH_OSD_OP_FLAG_FAILOK = 2, /* continue despite failure */ + CEPH_OSD_OP_FLAG_FADVISE_RANDOM = 0x4, /* the op is random */ + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL = 0x8, /* the op is sequential */ + CEPH_OSD_OP_FLAG_FADVISE_WILLNEED = 0x10,/* data will be accessed in + the near future */ + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED = 0x20,/* data will not be accessed + in the near future */ + CEPH_OSD_OP_FLAG_FADVISE_NOCACHE = 0x40,/* data will be accessed only + once by this client */ }; #define EOLDSNAPC ERESTART /* ORDERSNAP flag set; writer has old snapc*/ @@ -431,6 +439,15 @@ enum { CEPH_OSD_CMPXATTR_MODE_U64 = 2 }; +enum { + CEPH_OSD_COPY_FROM_FLAG_FLUSH = 1, /* part of a flush operation */ + CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY = 2, /* ignore pool overlay */ + CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE = 4, /* ignore osd cache logic */ + CEPH_OSD_COPY_FROM_FLAG_MAP_SNAP_CLONE = 8, /* map snap direct to + * cloneid */ + CEPH_OSD_COPY_FROM_FLAG_RWORDERED = 16, /* order with write */ +}; + enum { CEPH_OSD_WATCH_OP_UNWATCH = 0, CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1, @@ -497,6 +514,17 @@ struct ceph_osd_op { __le64 expected_object_size; __le64 expected_write_size; } __attribute__ ((packed)) alloc_hint; + struct { + __le64 snapid; + __le64 src_version; + __u8 flags; /* CEPH_OSD_COPY_FROM_FLAG_* */ + /* + * CEPH_OSD_OP_FLAG_FADVISE_*: fadvise flags + * for src object, flags for dest object are in + * ceph_osd_op::flags. + */ + __le32 src_fadvise_flags; + } __attribute__ ((packed)) copy_from; }; __le32 payload_len; } __attribute__ ((packed)); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index a0148de51cc6..d23a9f81f3d7 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -410,6 +410,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, case CEPH_OSD_OP_LIST_WATCHERS: ceph_osd_data_release(&op->list_watchers.response_data); break; + case CEPH_OSD_OP_COPY_FROM: + ceph_osd_data_release(&op->copy_from.osd_data); + break; default: break; } @@ -702,6 +705,7 @@ static void get_num_data_items(struct ceph_osd_request *req, case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_CMPXATTR: case CEPH_OSD_OP_NOTIFY_ACK: + case CEPH_OSD_OP_COPY_FROM: *num_request_data_items += 1; break; @@ -1016,6 +1020,14 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, case CEPH_OSD_OP_CREATE: case CEPH_OSD_OP_DELETE: break; + case CEPH_OSD_OP_COPY_FROM: + dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid); + dst->copy_from.src_version = + cpu_to_le64(src->copy_from.src_version); + dst->copy_from.flags = src->copy_from.flags; + dst->copy_from.src_fadvise_flags = + cpu_to_le32(src->copy_from.src_fadvise_flags); + break; default: pr_err("unsupported osd opcode %s\n", ceph_osd_op_name(src->op)); @@ -1947,6 +1959,10 @@ static void setup_request_data(struct ceph_osd_request *req) ceph_osdc_msg_data_add(request_msg, &op->notify_ack.request_data); break; + case CEPH_OSD_OP_COPY_FROM: + ceph_osdc_msg_data_add(request_msg, + &op->copy_from.osd_data); + break; /* reply */ case CEPH_OSD_OP_STAT: @@ -5255,6 +5271,80 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, } EXPORT_SYMBOL(ceph_osdc_writepages); +static int osd_req_op_copy_from_init(struct ceph_osd_request *req, + u64 src_snapid, u64 src_version, + struct ceph_object_id *src_oid, + struct ceph_object_locator *src_oloc, + u32 src_fadvise_flags, + u32 dst_fadvise_flags, + u8 copy_from_flags) +{ + struct ceph_osd_req_op *op; + struct page **pages; + void *p, *end; + + pages = ceph_alloc_page_vector(1, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags); + op->copy_from.snapid = src_snapid; + op->copy_from.src_version = src_version; + op->copy_from.flags = copy_from_flags; + op->copy_from.src_fadvise_flags = src_fadvise_flags; + + p = page_address(pages[0]); + end = p + PAGE_SIZE; + ceph_encode_string(&p, end, src_oid->name, src_oid->name_len); + encode_oloc(&p, end, src_oloc); + op->indata_len = PAGE_SIZE - (end - p); + + ceph_osd_data_pages_init(&op->copy_from.osd_data, pages, + op->indata_len, 0, false, true); + return 0; +} + +int ceph_osdc_copy_from(struct ceph_osd_client *osdc, + u64 src_snapid, u64 src_version, + struct ceph_object_id *src_oid, + struct ceph_object_locator *src_oloc, + u32 src_fadvise_flags, + struct ceph_object_id *dst_oid, + struct ceph_object_locator *dst_oloc, + u32 dst_fadvise_flags, + u8 copy_from_flags) +{ + struct ceph_osd_request *req; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); + if (!req) + return -ENOMEM; + + req->r_flags = CEPH_OSD_FLAG_WRITE; + + ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc); + ceph_oid_copy(&req->r_t.base_oid, dst_oid); + + ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid, + src_oloc, src_fadvise_flags, + dst_fadvise_flags, copy_from_flags); + if (ret) + goto out; + + ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); + if (ret) + goto out; + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + +out: + ceph_osdc_put_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_osdc_copy_from); + int __init ceph_osdc_setup(void) { size_t size = sizeof(struct ceph_osd_request) + From 503f82a9932d311c12430779627f7a130bbe462a Mon Sep 17 00:00:00 2001 From: Luis Henriques Date: Mon, 15 Oct 2018 16:45:59 +0100 Subject: [PATCH 24/25] ceph: support copy_file_range file operation This commit implements support for the copy_file_range syscall in cephfs. It is implemented using the RADOS 'copy-from' operation, which allows to do a remote object copy, without the need to download/upload data from/to the OSDs. Some manual copy may however be required if the source/destination file offsets aren't object aligned or if the copy length is smaller than the object size. Signed-off-by: Luis Henriques Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/file.c | 294 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 293 insertions(+), 1 deletion(-) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 0fa6b6b6ccbc..5557ec6760ea 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -1,5 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 #include +#include #include #include @@ -1795,6 +1796,297 @@ unlock: return ret; } +/* + * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for + * src_ci. Two attempts are made to obtain both caps, and an error is return if + * this fails; zero is returned on success. + */ +static int get_rd_wr_caps(struct ceph_inode_info *src_ci, + loff_t src_endoff, int *src_got, + struct ceph_inode_info *dst_ci, + loff_t dst_endoff, int *dst_got) +{ + int ret = 0; + bool retrying = false; + +retry_caps: + ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, + dst_endoff, dst_got, NULL); + if (ret < 0) + return ret; + + /* + * Since we're already holding the FILE_WR capability for the dst file, + * we would risk a deadlock by using ceph_get_caps. Thus, we'll do some + * retry dance instead to try to get both capabilities. + */ + ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED, + false, src_got); + if (ret <= 0) { + /* Start by dropping dst_ci caps and getting src_ci caps */ + ceph_put_cap_refs(dst_ci, *dst_got); + if (retrying) { + if (!ret) + /* ceph_try_get_caps masks EAGAIN */ + ret = -EAGAIN; + return ret; + } + ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD, + CEPH_CAP_FILE_SHARED, src_endoff, + src_got, NULL); + if (ret < 0) + return ret; + /*... drop src_ci caps too, and retry */ + ceph_put_cap_refs(src_ci, *src_got); + retrying = true; + goto retry_caps; + } + return ret; +} + +static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got, + struct ceph_inode_info *dst_ci, int dst_got) +{ + ceph_put_cap_refs(src_ci, src_got); + ceph_put_cap_refs(dst_ci, dst_got); +} + +/* + * This function does several size-related checks, returning an error if: + * - source file is smaller than off+len + * - destination file size is not OK (inode_newsize_ok()) + * - max bytes quotas is exceeded + */ +static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode, + loff_t src_off, loff_t dst_off, size_t len) +{ + loff_t size, endoff; + + size = i_size_read(src_inode); + /* + * Don't copy beyond source file EOF. Instead of simply setting length + * to (size - src_off), just drop to VFS default implementation, as the + * local i_size may be stale due to other clients writing to the source + * inode. + */ + if (src_off + len > size) { + dout("Copy beyond EOF (%llu + %zu > %llu)\n", + src_off, len, size); + return -EOPNOTSUPP; + } + size = i_size_read(dst_inode); + + endoff = dst_off + len; + if (inode_newsize_ok(dst_inode, endoff)) + return -EOPNOTSUPP; + + if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff)) + return -EDQUOT; + + return 0; +} + +static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off, + struct file *dst_file, loff_t dst_off, + size_t len, unsigned int flags) +{ + struct inode *src_inode = file_inode(src_file); + struct inode *dst_inode = file_inode(dst_file); + struct ceph_inode_info *src_ci = ceph_inode(src_inode); + struct ceph_inode_info *dst_ci = ceph_inode(dst_inode); + struct ceph_cap_flush *prealloc_cf; + struct ceph_object_locator src_oloc, dst_oloc; + struct ceph_object_id src_oid, dst_oid; + loff_t endoff = 0, size; + ssize_t ret = -EIO; + u64 src_objnum, dst_objnum, src_objoff, dst_objoff; + u32 src_objlen, dst_objlen, object_size; + int src_got = 0, dst_got = 0, err, dirty; + bool do_final_copy = false; + + if (src_inode == dst_inode) + return -EINVAL; + if (ceph_snap(dst_inode) != CEPH_NOSNAP) + return -EROFS; + + /* + * Some of the checks below will return -EOPNOTSUPP, which will force a + * fallback to the default VFS copy_file_range implementation. This is + * desirable in several cases (for ex, the 'len' is smaller than the + * size of the objects, or in cases where that would be more + * efficient). + */ + + if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) || + (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) || + (src_ci->i_layout.object_size != dst_ci->i_layout.object_size)) + return -EOPNOTSUPP; + + if (len < src_ci->i_layout.object_size) + return -EOPNOTSUPP; /* no remote copy will be done */ + + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + + /* Start by sync'ing the source file */ + ret = file_write_and_wait_range(src_file, src_off, (src_off + len)); + if (ret < 0) + goto out; + + /* + * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other + * clients may have dirty data in their caches. And OSDs know nothing + * about caps, so they can't safely do the remote object copies. + */ + err = get_rd_wr_caps(src_ci, (src_off + len), &src_got, + dst_ci, (dst_off + len), &dst_got); + if (err < 0) { + dout("get_rd_wr_caps returned %d\n", err); + ret = -EOPNOTSUPP; + goto out; + } + + ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len); + if (ret < 0) + goto out_caps; + + size = i_size_read(dst_inode); + endoff = dst_off + len; + + /* Drop dst file cached pages */ + ret = invalidate_inode_pages2_range(dst_inode->i_mapping, + dst_off >> PAGE_SHIFT, + endoff >> PAGE_SHIFT); + if (ret < 0) { + dout("Failed to invalidate inode pages (%zd)\n", ret); + ret = 0; /* XXX */ + } + src_oloc.pool = src_ci->i_layout.pool_id; + src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns); + dst_oloc.pool = dst_ci->i_layout.pool_id; + dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns); + + ceph_calc_file_object_mapping(&src_ci->i_layout, src_off, + src_ci->i_layout.object_size, + &src_objnum, &src_objoff, &src_objlen); + ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, + dst_ci->i_layout.object_size, + &dst_objnum, &dst_objoff, &dst_objlen); + /* object-level offsets need to the same */ + if (src_objoff != dst_objoff) { + ret = -EOPNOTSUPP; + goto out_caps; + } + + /* + * Do a manual copy if the object offset isn't object aligned. + * 'src_objlen' contains the bytes left until the end of the object, + * starting at the src_off + */ + if (src_objoff) { + /* + * we need to temporarily drop all caps as we'll be calling + * {read,write}_iter, which will get caps again. + */ + put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); + ret = do_splice_direct(src_file, &src_off, dst_file, + &dst_off, src_objlen, flags); + if (ret < 0) { + dout("do_splice_direct returned %d\n", err); + goto out; + } + len -= ret; + err = get_rd_wr_caps(src_ci, (src_off + len), + &src_got, dst_ci, + (dst_off + len), &dst_got); + if (err < 0) + goto out; + err = is_file_size_ok(src_inode, dst_inode, + src_off, dst_off, len); + if (err < 0) + goto out_caps; + } + object_size = src_ci->i_layout.object_size; + while (len >= object_size) { + ceph_calc_file_object_mapping(&src_ci->i_layout, src_off, + object_size, &src_objnum, + &src_objoff, &src_objlen); + ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, + object_size, &dst_objnum, + &dst_objoff, &dst_objlen); + ceph_oid_init(&src_oid); + ceph_oid_printf(&src_oid, "%llx.%08llx", + src_ci->i_vino.ino, src_objnum); + ceph_oid_init(&dst_oid); + ceph_oid_printf(&dst_oid, "%llx.%08llx", + dst_ci->i_vino.ino, dst_objnum); + /* Do an object remote copy */ + err = ceph_osdc_copy_from( + &ceph_inode_to_client(src_inode)->client->osdc, + src_ci->i_vino.snap, 0, + &src_oid, &src_oloc, + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | + CEPH_OSD_OP_FLAG_FADVISE_NOCACHE, + &dst_oid, &dst_oloc, + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0); + if (err) { + dout("ceph_osdc_copy_from returned %d\n", err); + if (!ret) + ret = err; + goto out_caps; + } + len -= object_size; + src_off += object_size; + dst_off += object_size; + ret += object_size; + } + + if (len) + /* We still need one final local copy */ + do_final_copy = true; + + file_update_time(dst_file); + if (endoff > size) { + int caps_flags = 0; + + /* Let the MDS know about dst file size change */ + if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff)) + caps_flags |= CHECK_CAPS_NODELAY; + if (ceph_inode_set_size(dst_inode, endoff)) + caps_flags |= CHECK_CAPS_AUTHONLY; + if (caps_flags) + ceph_check_caps(dst_ci, caps_flags, NULL); + } + /* Mark Fw dirty */ + spin_lock(&dst_ci->i_ceph_lock); + dst_ci->i_inline_version = CEPH_INLINE_NONE; + dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf); + spin_unlock(&dst_ci->i_ceph_lock); + if (dirty) + __mark_inode_dirty(dst_inode, dirty); + +out_caps: + put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); + + if (do_final_copy) { + err = do_splice_direct(src_file, &src_off, dst_file, + &dst_off, len, flags); + if (err < 0) { + dout("do_splice_direct returned %d\n", err); + goto out; + } + len -= err; + ret += err; + } + +out: + ceph_free_cap_flush(prealloc_cf); + + return ret; +} + const struct file_operations ceph_file_fops = { .open = ceph_open, .release = ceph_release, @@ -1810,5 +2102,5 @@ const struct file_operations ceph_file_fops = { .unlocked_ioctl = ceph_ioctl, .compat_ioctl = ceph_ioctl, .fallocate = ceph_fallocate, + .copy_file_range = ceph_copy_file_range, }; - From ea4cdc548e5e74a529cdd1aea885d74b4aa8f1b3 Mon Sep 17 00:00:00 2001 From: Luis Henriques Date: Mon, 15 Oct 2018 16:46:00 +0100 Subject: [PATCH 25/25] ceph: new mount option to disable usage of copy-from op Add a new mount option 'nocopyfrom' that will prevent the usage of the RADOS 'copy-from' operation in cephfs. This could be useful, for example, for an administrator to temporarily mitigate any possible bugs in the 'copy-from' implementation. Currently, only copy_file_range uses this RADOS operation. Setting this mount option will result in this syscall reverting to the default VFS implementation, i.e. to perform the copies locally instead of doing remote object copies. Signed-off-by: Luis Henriques Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- Documentation/filesystems/ceph.txt | 5 +++++ fs/ceph/file.c | 3 +++ fs/ceph/super.c | 13 +++++++++++++ fs/ceph/super.h | 1 + 4 files changed, 22 insertions(+) diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt index 8bf62240e10d..1177052701e1 100644 --- a/Documentation/filesystems/ceph.txt +++ b/Documentation/filesystems/ceph.txt @@ -151,6 +151,11 @@ Mount Options Report overall filesystem usage in statfs instead of using the root directory quota. + nocopyfrom + Don't use the RADOS 'copy-from' operation to perform remote object + copies. Currently, it's only used in copy_file_range, which will revert + to the default VFS implementation if this option is used. + More Information ================ diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 5557ec6760ea..f788496fafcc 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -1917,6 +1917,9 @@ static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off, * efficient). */ + if (ceph_test_mount_opt(ceph_inode_to_client(src_inode), NOCOPYFROM)) + return -EOPNOTSUPP; + if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) || (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) || (src_ci->i_layout.object_size != dst_ci->i_layout.object_size)) diff --git a/fs/ceph/super.c b/fs/ceph/super.c index eab1359d0553..b5ecd6f50360 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -165,6 +165,8 @@ enum { Opt_noacl, Opt_quotadf, Opt_noquotadf, + Opt_copyfrom, + Opt_nocopyfrom, }; static match_table_t fsopt_tokens = { @@ -203,6 +205,8 @@ static match_table_t fsopt_tokens = { {Opt_noacl, "noacl"}, {Opt_quotadf, "quotadf"}, {Opt_noquotadf, "noquotadf"}, + {Opt_copyfrom, "copyfrom"}, + {Opt_nocopyfrom, "nocopyfrom"}, {-1, NULL} }; @@ -355,6 +359,12 @@ static int parse_fsopt_token(char *c, void *private) case Opt_noquotadf: fsopt->flags |= CEPH_MOUNT_OPT_NOQUOTADF; break; + case Opt_copyfrom: + fsopt->flags &= ~CEPH_MOUNT_OPT_NOCOPYFROM; + break; + case Opt_nocopyfrom: + fsopt->flags |= CEPH_MOUNT_OPT_NOCOPYFROM; + break; #ifdef CONFIG_CEPH_FS_POSIX_ACL case Opt_acl: fsopt->sb_flags |= SB_POSIXACL; @@ -553,6 +563,9 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root) seq_puts(m, ",noacl"); #endif + if (fsopt->flags & CEPH_MOUNT_OPT_NOCOPYFROM) + seq_puts(m, ",nocopyfrom"); + if (fsopt->mds_namespace) seq_show_option(m, "mds_namespace", fsopt->mds_namespace); if (fsopt->wsize != CEPH_MAX_WRITE_SIZE) diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 91b13400badd..c005a5400f2e 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -40,6 +40,7 @@ #define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */ #define CEPH_MOUNT_OPT_MOUNTWAIT (1<<12) /* mount waits if no mds is up */ #define CEPH_MOUNT_OPT_NOQUOTADF (1<<13) /* no root dir quota in statfs */ +#define CEPH_MOUNT_OPT_NOCOPYFROM (1<<14) /* don't use RADOS 'copy-from' op */ #define CEPH_MOUNT_OPT_DEFAULT CEPH_MOUNT_OPT_DCACHE