fs: dlm: fix multiple empty writequeue alloc

This patch will add a mutex that a connection can allocate a writequeue
entry buffer only at a sleepable context at one time. If multiple caller
waits at the writequeue spinlock and the spinlock gets release it could
be that multiple new writequeue page buffers were allocated instead of
allocate one writequeue page buffer and other waiters will use remaining
buffer of it. It will only be the case for sleepable context which is
the common case. In non-sleepable contexts like retransmission we just
don't care about such behaviour.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
This commit is contained in:
Alexander Aring 2021-07-16 16:22:44 -04:00 committed by David Teigland
parent 8728a455d2
commit c51b022179

View File

@ -84,6 +84,7 @@ struct connection {
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
atomic_t writequeue_cnt;
struct mutex wq_alloc;
int retries;
#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
@ -264,6 +265,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
return NULL;
}
mutex_init(&con->wq_alloc);
spin_lock(&connections_lock);
/* Because multiple workqueues/threads calls this function it can
* race on multiple cpu's. Instead of locking hot path __find_con()
@ -1251,19 +1254,37 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
{
struct writequeue_entry *e;
struct dlm_msg *msg;
bool sleepable;
msg = kzalloc(sizeof(*msg), allocation);
if (!msg)
return NULL;
/* this mutex is being used as a wait to avoid multiple "fast"
* new writequeue page list entry allocs in new_wq_entry in
* normal operation which is sleepable context. Without it
* we could end in multiple writequeue entries with one
* dlm message because multiple callers were waiting at
* the writequeue_lock in new_wq_entry().
*/
sleepable = gfpflags_normal_context(allocation);
if (sleepable)
mutex_lock(&con->wq_alloc);
kref_init(&msg->ref);
e = new_wq_entry(con, len, allocation, ppc, cb, mh);
if (!e) {
if (sleepable)
mutex_unlock(&con->wq_alloc);
kfree(msg);
return NULL;
}
if (sleepable)
mutex_unlock(&con->wq_alloc);
msg->ppc = *ppc;
msg->len = len;
msg->entry = e;