Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph fixes from Sage Weil:
 "There is a lifecycle fix in the auth code, a fix for a narrow race
  condition on map, and a helpful message in the log when there is a
  feature mismatch (which happens frequently now that the default
  server-side options have changed)"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  rbd: report unsupported features to syslog
  rbd: fix rbd map vs notify races
  libceph: make authorizer destruction independent of ceph_auth_client
This commit is contained in:
Linus Torvalds 2016-04-28 18:59:24 -07:00
commit 6fa9bffbcc
10 changed files with 87 additions and 92 deletions

View File

@ -538,7 +538,6 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
u8 *order, u64 *snap_size); u8 *order, u64 *snap_size);
static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
u64 *snap_features); u64 *snap_features);
static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
static int rbd_open(struct block_device *bdev, fmode_t mode) static int rbd_open(struct block_device *bdev, fmode_t mode)
{ {
@ -3127,9 +3126,6 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
struct rbd_device *rbd_dev = (struct rbd_device *)data; struct rbd_device *rbd_dev = (struct rbd_device *)data;
int ret; int ret;
if (!rbd_dev)
return;
dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__, dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
rbd_dev->header_name, (unsigned long long)notify_id, rbd_dev->header_name, (unsigned long long)notify_id,
(unsigned int)opcode); (unsigned int)opcode);
@ -3263,6 +3259,9 @@ static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
ceph_osdc_cancel_event(rbd_dev->watch_event); ceph_osdc_cancel_event(rbd_dev->watch_event);
rbd_dev->watch_event = NULL; rbd_dev->watch_event = NULL;
dout("%s flushing notifies\n", __func__);
ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
} }
/* /*
@ -3642,21 +3641,14 @@ static void rbd_exists_validate(struct rbd_device *rbd_dev)
static void rbd_dev_update_size(struct rbd_device *rbd_dev) static void rbd_dev_update_size(struct rbd_device *rbd_dev)
{ {
sector_t size; sector_t size;
bool removing;
/* /*
* Don't hold the lock while doing disk operations, * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
* or lock ordering will conflict with the bdev mutex via: * try to update its size. If REMOVING is set, updating size
* rbd_add() -> blkdev_get() -> rbd_open() * is just useless work since the device can't be opened.
*/ */
spin_lock_irq(&rbd_dev->lock); if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags); !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
spin_unlock_irq(&rbd_dev->lock);
/*
* If the device is being removed, rbd_dev->disk has
* been destroyed, so don't try to update its size
*/
if (!removing) {
size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE; size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
dout("setting size to %llu sectors", (unsigned long long)size); dout("setting size to %llu sectors", (unsigned long long)size);
set_capacity(rbd_dev->disk, size); set_capacity(rbd_dev->disk, size);
@ -4191,7 +4183,7 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
__le64 features; __le64 features;
__le64 incompat; __le64 incompat;
} __attribute__ ((packed)) features_buf = { 0 }; } __attribute__ ((packed)) features_buf = { 0 };
u64 incompat; u64 unsup;
int ret; int ret;
ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
@ -4204,9 +4196,12 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
if (ret < sizeof (features_buf)) if (ret < sizeof (features_buf))
return -ERANGE; return -ERANGE;
incompat = le64_to_cpu(features_buf.incompat); unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
if (incompat & ~RBD_FEATURES_SUPPORTED) if (unsup) {
rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
unsup);
return -ENXIO; return -ENXIO;
}
*snap_features = le64_to_cpu(features_buf.features); *snap_features = le64_to_cpu(features_buf.features);
@ -5187,6 +5182,10 @@ out_err:
return ret; return ret;
} }
/*
* rbd_dev->header_rwsem must be locked for write and will be unlocked
* upon return.
*/
static int rbd_dev_device_setup(struct rbd_device *rbd_dev) static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
{ {
int ret; int ret;
@ -5195,7 +5194,7 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
ret = rbd_dev_id_get(rbd_dev); ret = rbd_dev_id_get(rbd_dev);
if (ret) if (ret)
return ret; goto err_out_unlock;
BUILD_BUG_ON(DEV_NAME_LEN BUILD_BUG_ON(DEV_NAME_LEN
< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
@ -5236,8 +5235,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
/* Everything's ready. Announce the disk to the world. */ /* Everything's ready. Announce the disk to the world. */
set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
add_disk(rbd_dev->disk); up_write(&rbd_dev->header_rwsem);
add_disk(rbd_dev->disk);
pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
(unsigned long long) rbd_dev->mapping.size); (unsigned long long) rbd_dev->mapping.size);
@ -5252,6 +5252,8 @@ err_out_blkdev:
unregister_blkdev(rbd_dev->major, rbd_dev->name); unregister_blkdev(rbd_dev->major, rbd_dev->name);
err_out_id: err_out_id:
rbd_dev_id_put(rbd_dev); rbd_dev_id_put(rbd_dev);
err_out_unlock:
up_write(&rbd_dev->header_rwsem);
return ret; return ret;
} }
@ -5442,6 +5444,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
spec = NULL; /* rbd_dev now owns this */ spec = NULL; /* rbd_dev now owns this */
rbd_opts = NULL; /* rbd_dev now owns this */ rbd_opts = NULL; /* rbd_dev now owns this */
down_write(&rbd_dev->header_rwsem);
rc = rbd_dev_image_probe(rbd_dev, 0); rc = rbd_dev_image_probe(rbd_dev, 0);
if (rc < 0) if (rc < 0)
goto err_out_rbd_dev; goto err_out_rbd_dev;
@ -5471,6 +5474,7 @@ out:
return rc; return rc;
err_out_rbd_dev: err_out_rbd_dev:
up_write(&rbd_dev->header_rwsem);
rbd_dev_destroy(rbd_dev); rbd_dev_destroy(rbd_dev);
err_out_client: err_out_client:
rbd_put_client(rbdc); rbd_put_client(rbdc);
@ -5577,12 +5581,6 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
return ret; return ret;
rbd_dev_header_unwatch_sync(rbd_dev); rbd_dev_header_unwatch_sync(rbd_dev);
/*
* flush remaining watch callbacks - these must be complete
* before the osd_client is shutdown
*/
dout("%s: flushing notifies", __func__);
ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
/* /*
* Don't free anything from rbd_dev->disk until after all * Don't free anything from rbd_dev->disk until after all

View File

@ -386,9 +386,7 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
if (atomic_dec_and_test(&s->s_ref)) { if (atomic_dec_and_test(&s->s_ref)) {
if (s->s_auth.authorizer) if (s->s_auth.authorizer)
ceph_auth_destroy_authorizer( ceph_auth_destroy_authorizer(s->s_auth.authorizer);
s->s_mdsc->fsc->client->monc.auth,
s->s_auth.authorizer);
kfree(s); kfree(s);
} }
} }
@ -3900,7 +3898,7 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
struct ceph_auth_handshake *auth = &s->s_auth; struct ceph_auth_handshake *auth = &s->s_auth;
if (force_new && auth->authorizer) { if (force_new && auth->authorizer) {
ceph_auth_destroy_authorizer(ac, auth->authorizer); ceph_auth_destroy_authorizer(auth->authorizer);
auth->authorizer = NULL; auth->authorizer = NULL;
} }
if (!auth->authorizer) { if (!auth->authorizer) {

View File

@ -12,9 +12,12 @@
*/ */
struct ceph_auth_client; struct ceph_auth_client;
struct ceph_authorizer;
struct ceph_msg; struct ceph_msg;
struct ceph_authorizer {
void (*destroy)(struct ceph_authorizer *);
};
struct ceph_auth_handshake { struct ceph_auth_handshake {
struct ceph_authorizer *authorizer; struct ceph_authorizer *authorizer;
void *authorizer_buf; void *authorizer_buf;
@ -62,8 +65,6 @@ struct ceph_auth_client_ops {
struct ceph_auth_handshake *auth); struct ceph_auth_handshake *auth);
int (*verify_authorizer_reply)(struct ceph_auth_client *ac, int (*verify_authorizer_reply)(struct ceph_auth_client *ac,
struct ceph_authorizer *a, size_t len); struct ceph_authorizer *a, size_t len);
void (*destroy_authorizer)(struct ceph_auth_client *ac,
struct ceph_authorizer *a);
void (*invalidate_authorizer)(struct ceph_auth_client *ac, void (*invalidate_authorizer)(struct ceph_auth_client *ac,
int peer_type); int peer_type);
@ -112,8 +113,7 @@ extern int ceph_auth_is_authenticated(struct ceph_auth_client *ac);
extern int ceph_auth_create_authorizer(struct ceph_auth_client *ac, extern int ceph_auth_create_authorizer(struct ceph_auth_client *ac,
int peer_type, int peer_type,
struct ceph_auth_handshake *auth); struct ceph_auth_handshake *auth);
extern void ceph_auth_destroy_authorizer(struct ceph_auth_client *ac, void ceph_auth_destroy_authorizer(struct ceph_authorizer *a);
struct ceph_authorizer *a);
extern int ceph_auth_update_authorizer(struct ceph_auth_client *ac, extern int ceph_auth_update_authorizer(struct ceph_auth_client *ac,
int peer_type, int peer_type,
struct ceph_auth_handshake *a); struct ceph_auth_handshake *a);

View File

@ -16,7 +16,6 @@ struct ceph_msg;
struct ceph_snap_context; struct ceph_snap_context;
struct ceph_osd_request; struct ceph_osd_request;
struct ceph_osd_client; struct ceph_osd_client;
struct ceph_authorizer;
/* /*
* completion callback for async writepages * completion callback for async writepages

View File

@ -293,13 +293,9 @@ int ceph_auth_create_authorizer(struct ceph_auth_client *ac,
} }
EXPORT_SYMBOL(ceph_auth_create_authorizer); EXPORT_SYMBOL(ceph_auth_create_authorizer);
void ceph_auth_destroy_authorizer(struct ceph_auth_client *ac, void ceph_auth_destroy_authorizer(struct ceph_authorizer *a)
struct ceph_authorizer *a)
{ {
mutex_lock(&ac->mutex); a->destroy(a);
if (ac->ops && ac->ops->destroy_authorizer)
ac->ops->destroy_authorizer(ac, a);
mutex_unlock(&ac->mutex);
} }
EXPORT_SYMBOL(ceph_auth_destroy_authorizer); EXPORT_SYMBOL(ceph_auth_destroy_authorizer);

View File

@ -16,7 +16,6 @@ static void reset(struct ceph_auth_client *ac)
struct ceph_auth_none_info *xi = ac->private; struct ceph_auth_none_info *xi = ac->private;
xi->starting = true; xi->starting = true;
xi->built_authorizer = false;
} }
static void destroy(struct ceph_auth_client *ac) static void destroy(struct ceph_auth_client *ac)
@ -39,6 +38,27 @@ static int should_authenticate(struct ceph_auth_client *ac)
return xi->starting; return xi->starting;
} }
static int ceph_auth_none_build_authorizer(struct ceph_auth_client *ac,
struct ceph_none_authorizer *au)
{
void *p = au->buf;
void *const end = p + sizeof(au->buf);
int ret;
ceph_encode_8_safe(&p, end, 1, e_range);
ret = ceph_entity_name_encode(ac->name, &p, end);
if (ret < 0)
return ret;
ceph_encode_64_safe(&p, end, ac->global_id, e_range);
au->buf_len = p - (void *)au->buf;
dout("%s built authorizer len %d\n", __func__, au->buf_len);
return 0;
e_range:
return -ERANGE;
}
static int build_request(struct ceph_auth_client *ac, void *buf, void *end) static int build_request(struct ceph_auth_client *ac, void *buf, void *end)
{ {
return 0; return 0;
@ -57,32 +77,32 @@ static int handle_reply(struct ceph_auth_client *ac, int result,
return result; return result;
} }
static void ceph_auth_none_destroy_authorizer(struct ceph_authorizer *a)
{
kfree(a);
}
/* /*
* build an 'authorizer' with our entity_name and global_id. we can * build an 'authorizer' with our entity_name and global_id. it is
* reuse a single static copy since it is identical for all services * identical for all services we connect to.
* we connect to.
*/ */
static int ceph_auth_none_create_authorizer( static int ceph_auth_none_create_authorizer(
struct ceph_auth_client *ac, int peer_type, struct ceph_auth_client *ac, int peer_type,
struct ceph_auth_handshake *auth) struct ceph_auth_handshake *auth)
{ {
struct ceph_auth_none_info *ai = ac->private; struct ceph_none_authorizer *au;
struct ceph_none_authorizer *au = &ai->au;
void *p, *end;
int ret; int ret;
if (!ai->built_authorizer) { au = kmalloc(sizeof(*au), GFP_NOFS);
p = au->buf; if (!au)
end = p + sizeof(au->buf); return -ENOMEM;
ceph_encode_8(&p, 1);
ret = ceph_entity_name_encode(ac->name, &p, end - 8); au->base.destroy = ceph_auth_none_destroy_authorizer;
if (ret < 0)
goto bad; ret = ceph_auth_none_build_authorizer(ac, au);
ceph_decode_need(&p, end, sizeof(u64), bad2); if (ret) {
ceph_encode_64(&p, ac->global_id); kfree(au);
au->buf_len = p - (void *)au->buf; return ret;
ai->built_authorizer = true;
dout("built authorizer len %d\n", au->buf_len);
} }
auth->authorizer = (struct ceph_authorizer *) au; auth->authorizer = (struct ceph_authorizer *) au;
@ -92,17 +112,6 @@ static int ceph_auth_none_create_authorizer(
auth->authorizer_reply_buf_len = sizeof (au->reply_buf); auth->authorizer_reply_buf_len = sizeof (au->reply_buf);
return 0; return 0;
bad2:
ret = -ERANGE;
bad:
return ret;
}
static void ceph_auth_none_destroy_authorizer(struct ceph_auth_client *ac,
struct ceph_authorizer *a)
{
/* nothing to do */
} }
static const struct ceph_auth_client_ops ceph_auth_none_ops = { static const struct ceph_auth_client_ops ceph_auth_none_ops = {
@ -114,7 +123,6 @@ static const struct ceph_auth_client_ops ceph_auth_none_ops = {
.build_request = build_request, .build_request = build_request,
.handle_reply = handle_reply, .handle_reply = handle_reply,
.create_authorizer = ceph_auth_none_create_authorizer, .create_authorizer = ceph_auth_none_create_authorizer,
.destroy_authorizer = ceph_auth_none_destroy_authorizer,
}; };
int ceph_auth_none_init(struct ceph_auth_client *ac) int ceph_auth_none_init(struct ceph_auth_client *ac)
@ -127,7 +135,6 @@ int ceph_auth_none_init(struct ceph_auth_client *ac)
return -ENOMEM; return -ENOMEM;
xi->starting = true; xi->starting = true;
xi->built_authorizer = false;
ac->protocol = CEPH_AUTH_NONE; ac->protocol = CEPH_AUTH_NONE;
ac->private = xi; ac->private = xi;

View File

@ -12,6 +12,7 @@
*/ */
struct ceph_none_authorizer { struct ceph_none_authorizer {
struct ceph_authorizer base;
char buf[128]; char buf[128];
int buf_len; int buf_len;
char reply_buf[0]; char reply_buf[0];
@ -19,8 +20,6 @@ struct ceph_none_authorizer {
struct ceph_auth_none_info { struct ceph_auth_none_info {
bool starting; bool starting;
bool built_authorizer;
struct ceph_none_authorizer au; /* we only need one; it's static */
}; };
int ceph_auth_none_init(struct ceph_auth_client *ac); int ceph_auth_none_init(struct ceph_auth_client *ac);

View File

@ -565,6 +565,14 @@ static int ceph_x_handle_reply(struct ceph_auth_client *ac, int result,
return -EAGAIN; return -EAGAIN;
} }
static void ceph_x_destroy_authorizer(struct ceph_authorizer *a)
{
struct ceph_x_authorizer *au = (void *)a;
ceph_x_authorizer_cleanup(au);
kfree(au);
}
static int ceph_x_create_authorizer( static int ceph_x_create_authorizer(
struct ceph_auth_client *ac, int peer_type, struct ceph_auth_client *ac, int peer_type,
struct ceph_auth_handshake *auth) struct ceph_auth_handshake *auth)
@ -581,6 +589,8 @@ static int ceph_x_create_authorizer(
if (!au) if (!au)
return -ENOMEM; return -ENOMEM;
au->base.destroy = ceph_x_destroy_authorizer;
ret = ceph_x_build_authorizer(ac, th, au); ret = ceph_x_build_authorizer(ac, th, au);
if (ret) { if (ret) {
kfree(au); kfree(au);
@ -643,16 +653,6 @@ static int ceph_x_verify_authorizer_reply(struct ceph_auth_client *ac,
return ret; return ret;
} }
static void ceph_x_destroy_authorizer(struct ceph_auth_client *ac,
struct ceph_authorizer *a)
{
struct ceph_x_authorizer *au = (void *)a;
ceph_x_authorizer_cleanup(au);
kfree(au);
}
static void ceph_x_reset(struct ceph_auth_client *ac) static void ceph_x_reset(struct ceph_auth_client *ac)
{ {
struct ceph_x_info *xi = ac->private; struct ceph_x_info *xi = ac->private;
@ -770,7 +770,6 @@ static const struct ceph_auth_client_ops ceph_x_ops = {
.create_authorizer = ceph_x_create_authorizer, .create_authorizer = ceph_x_create_authorizer,
.update_authorizer = ceph_x_update_authorizer, .update_authorizer = ceph_x_update_authorizer,
.verify_authorizer_reply = ceph_x_verify_authorizer_reply, .verify_authorizer_reply = ceph_x_verify_authorizer_reply,
.destroy_authorizer = ceph_x_destroy_authorizer,
.invalidate_authorizer = ceph_x_invalidate_authorizer, .invalidate_authorizer = ceph_x_invalidate_authorizer,
.reset = ceph_x_reset, .reset = ceph_x_reset,
.destroy = ceph_x_destroy, .destroy = ceph_x_destroy,

View File

@ -26,6 +26,7 @@ struct ceph_x_ticket_handler {
struct ceph_x_authorizer { struct ceph_x_authorizer {
struct ceph_authorizer base;
struct ceph_crypto_key session_key; struct ceph_crypto_key session_key;
struct ceph_buffer *buf; struct ceph_buffer *buf;
unsigned int service; unsigned int service;

View File

@ -1087,10 +1087,8 @@ static void put_osd(struct ceph_osd *osd)
dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
atomic_read(&osd->o_ref) - 1); atomic_read(&osd->o_ref) - 1);
if (atomic_dec_and_test(&osd->o_ref)) { if (atomic_dec_and_test(&osd->o_ref)) {
struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
if (osd->o_auth.authorizer) if (osd->o_auth.authorizer)
ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer); ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
kfree(osd); kfree(osd);
} }
} }
@ -2984,7 +2982,7 @@ static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
struct ceph_auth_handshake *auth = &o->o_auth; struct ceph_auth_handshake *auth = &o->o_auth;
if (force_new && auth->authorizer) { if (force_new && auth->authorizer) {
ceph_auth_destroy_authorizer(ac, auth->authorizer); ceph_auth_destroy_authorizer(auth->authorizer);
auth->authorizer = NULL; auth->authorizer = NULL;
} }
if (!auth->authorizer) { if (!auth->authorizer) {