dlm for 5.15

This set includes a number of minor fixes and cleanups
 related to the networking changes in the last release.
 A patch to delay ack messages reduces network traffic
 significantly.
 -----BEGIN PGP SIGNATURE-----
 
 iQIcBAABAgAGBQJhMNuAAAoJEDgbc8f8gGmqHR8P/igKtFfulNhXcPeya1/aThQH
 yvTt+q1dzz5oySKSpf1YBFBZ5xT9X0tg+umgAfFmPLv237mXM2W4W5LcVtExxVih
 Uj7ESFen9dfSCgEuFrYa16XqMGPjEdY5p49K5ZhBfDQIIO/AIHevQuShHskOHcmr
 c3qBOUdxQCQXRMkZ35VQHUXh4pBAXVbU4vL3SMDA0MQCYoIfAySUoEuJjXRuuSrY
 sRl4rZodthBFHyAmI0CRD5p3rSUkCfboOwecC0GtiIVdwG1k3piQSOkBJfMDC14X
 cbhaFxbsYXWfkJPjiaCSPbI8UqBWyAgAGaVIbVQj+b73ncUc4o+1ZNwFfoZ7wooT
 H89mCwonWn4L1SAAyfH3pHdnzFjqiFrR8f5RJSlqExHgnPubj1PE0TdINoyKThcQ
 79Ht+IvKVCBvLrGOCV/2XcjT/IjdVUADXe+YCB9Y7CK4kMDoqrRDq0B/jBGbgY8q
 PEouuYmroKF1CR5ioFpgL7lXdpmLWKb1WnnRI5MFNFL/wwCxDHxz9LVs6GBsNFJ0
 b+af7NMB1TnGAjFxsLMdKOaADAZdvdkzEBVvYzzNo1KoiZJZTiepkCeghZ5u2YJ1
 lt95afaIRbFMRvxKxDx7CDIa2NKlD+odLKVd0nr8snJmdx0afoWvZsMBCVjtlZNf
 JtrdvApemYwWCLbuBKgL
 =mHIc
 -----END PGP SIGNATURE-----

Merge tag 'dlm-5.15' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

Pull dlm updates from David Teigland:
 "This set includes a number of minor fixes and cleanups related to the
  networking changes in the last release.

  A patch to delay ack messages reduces network traffic significantly"

* tag 'dlm-5.15' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  fs: dlm: avoid comms shutdown delay in release_lockspace
  fs: dlm: fix return -EINTR on recovery stopped
  fs: dlm: implement delayed ack handling
  fs: dlm: move receive loop into receive handler
  fs: dlm: fix multiple empty writequeue alloc
  fs: dlm: generic connect func
  fs: dlm: auto load sctp module
  fs: dlm: introduce generic listen
  fs: dlm: move to static proto ops
  fs: dlm: introduce con_next_wq helper
  fs: dlm: cleanup and remove _send_rcom
  fs: dlm: clear CF_APP_LIMITED on close
  fs: dlm: fix typo in tlv prefix
  fs: dlm: use READ_ONCE for config var
  fs: dlm: use sk->sk_socket instead of con->sock
This commit is contained in:
Linus Torvalds 2021-09-02 10:19:45 -07:00
commit 265113f70f
9 changed files with 456 additions and 413 deletions

View File

@ -85,8 +85,10 @@ int dlm_recover_directory(struct dlm_ls *ls)
for (;;) {
int left;
error = dlm_recovery_stopped(ls);
if (error)
if (error) {
error = -EINTR;
goto out_free;
}
error = dlm_rcom_names(ls, memb->nodeid,
last_name, last_len);

View File

@ -468,7 +468,7 @@ struct dlm_rcom {
struct dlm_opt_header {
uint16_t t_type;
uint16_t t_length;
uint32_t o_pad;
uint32_t t_pad;
/* need to be 8 byte aligned */
char t_value[];
};

View File

@ -498,7 +498,7 @@ static int new_lockspace(const char *name, const char *cluster,
ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
DLM_LSFL_NEWEXCL));
size = dlm_config.ci_rsbtbl_size;
size = READ_ONCE(dlm_config.ci_rsbtbl_size);
ls->ls_rsbtbl_size = size;
ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
@ -793,6 +793,7 @@ static int release_lockspace(struct dlm_ls *ls, int force)
if (ls_count == 1) {
dlm_scand_stop();
dlm_clear_members(ls);
dlm_midcomms_shutdown();
}

View File

@ -84,9 +84,7 @@ struct connection {
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
atomic_t writequeue_cnt;
void (*connect_action) (struct connection *); /* What to do to connect */
void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
bool (*eof_condition)(struct connection *con); /* What to do to eof check */
struct mutex wq_alloc;
int retries;
#define MAX_CONNECT_RETRIES 3
struct hlist_node list;
@ -145,6 +143,24 @@ struct dlm_node_addr {
struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
};
struct dlm_proto_ops {
bool try_new_addr;
const char *name;
int proto;
int (*connect)(struct connection *con, struct socket *sock,
struct sockaddr *addr, int addr_len);
void (*sockopts)(struct socket *sock);
int (*bind)(struct socket *sock);
int (*listen_validate)(void);
void (*listen_sockopts)(struct socket *sock);
int (*listen_bind)(struct socket *sock);
/* What to do to shutdown */
void (*shutdown_action)(struct connection *con);
/* What to do to eof check */
bool (*eof_condition)(struct connection *con);
};
static struct listen_sock_callbacks {
void (*sk_error_report)(struct sock *);
void (*sk_data_ready)(struct sock *);
@ -168,12 +184,26 @@ static struct hlist_head connection_hash[CONN_HASH_SIZE];
static DEFINE_SPINLOCK(connections_lock);
DEFINE_STATIC_SRCU(connections_srcu);
static const struct dlm_proto_ops *dlm_proto_ops;
static void process_recv_sockets(struct work_struct *work);
static void process_send_sockets(struct work_struct *work);
static void sctp_connect_to_sock(struct connection *con);
static void tcp_connect_to_sock(struct connection *con);
static void dlm_tcp_shutdown(struct connection *con);
/* need to held writequeue_lock */
static struct writequeue_entry *con_next_wq(struct connection *con)
{
struct writequeue_entry *e;
if (list_empty(&con->writequeue))
return NULL;
e = list_first_entry(&con->writequeue, struct writequeue_entry,
list);
if (e->len == 0)
return NULL;
return e;
}
static struct connection *__find_con(int nodeid, int r)
{
@ -208,20 +238,6 @@ static int dlm_con_init(struct connection *con, int nodeid)
INIT_WORK(&con->rwork, process_recv_sockets);
init_waitqueue_head(&con->shutdown_wait);
switch (dlm_config.ci_protocol) {
case DLM_PROTO_TCP:
con->connect_action = tcp_connect_to_sock;
con->shutdown_action = dlm_tcp_shutdown;
con->eof_condition = tcp_eof_condition;
break;
case DLM_PROTO_SCTP:
con->connect_action = sctp_connect_to_sock;
break;
default:
kfree(con->rx_buf);
return -EINVAL;
}
return 0;
}
@ -249,6 +265,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
return NULL;
}
mutex_init(&con->wq_alloc);
spin_lock(&connections_lock);
/* Because multiple workqueues/threads calls this function it can
* race on multiple cpu's. Instead of locking hot path __find_con()
@ -583,8 +601,7 @@ static void lowcomms_error_report(struct sock *sk)
goto out;
orig_report = listen_sock.sk_error_report;
if (con->sock == NULL ||
kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) {
if (kernel_getpeername(sk->sk_socket, (struct sockaddr *)&saddr) < 0) {
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
"sending to node %d, port %d, "
"sk_err=%d/%d\n", dlm_our_nodeid(),
@ -801,6 +818,7 @@ static void close_connection(struct connection *con, bool and_other,
con->rx_leftover = 0;
con->retries = 0;
clear_bit(CF_APP_LIMITED, &con->flags);
clear_bit(CF_CONNECTED, &con->flags);
clear_bit(CF_DELAY_CONNECT, &con->flags);
clear_bit(CF_RECONNECT, &con->flags);
@ -877,7 +895,6 @@ static int con_realloc_receive_buf(struct connection *con, int newlen)
/* Data received from remote end */
static int receive_from_sock(struct connection *con)
{
int call_again_soon = 0;
struct msghdr msg;
struct kvec iov;
int ret, buflen;
@ -897,41 +914,40 @@ static int receive_from_sock(struct connection *con)
goto out_resched;
}
/* calculate new buffer parameter regarding last receive and
* possible leftover bytes
*/
iov.iov_base = con->rx_buf + con->rx_leftover;
iov.iov_len = con->rx_buflen - con->rx_leftover;
for (;;) {
/* calculate new buffer parameter regarding last receive and
* possible leftover bytes
*/
iov.iov_base = con->rx_buf + con->rx_leftover;
iov.iov_len = con->rx_buflen - con->rx_leftover;
memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
msg.msg_flags);
if (ret <= 0)
goto out_close;
else if (ret == iov.iov_len)
call_again_soon = 1;
memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
msg.msg_flags);
if (ret == -EAGAIN)
break;
else if (ret <= 0)
goto out_close;
/* new buflen according readed bytes and leftover from last receive */
buflen = ret + con->rx_leftover;
ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
if (ret < 0)
goto out_close;
/* new buflen according readed bytes and leftover from last receive */
buflen = ret + con->rx_leftover;
ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
if (ret < 0)
goto out_close;
/* calculate leftover bytes from process and put it into begin of
* the receive buffer, so next receive we have the full message
* at the start address of the receive buffer.
*/
con->rx_leftover = buflen - ret;
if (con->rx_leftover) {
memmove(con->rx_buf, con->rx_buf + ret,
con->rx_leftover);
call_again_soon = true;
/* calculate leftover bytes from process and put it into begin of
* the receive buffer, so next receive we have the full message
* at the start address of the receive buffer.
*/
con->rx_leftover = buflen - ret;
if (con->rx_leftover) {
memmove(con->rx_buf, con->rx_buf + ret,
con->rx_leftover);
}
}
if (call_again_soon)
goto out_resched;
dlm_midcomms_receive_done(con->nodeid);
mutex_unlock(&con->sock_mutex);
return 0;
@ -946,7 +962,8 @@ out_close:
log_print("connection %p got EOF from %d",
con, con->nodeid);
if (con->eof_condition && con->eof_condition(con)) {
if (dlm_proto_ops->eof_condition &&
dlm_proto_ops->eof_condition(con)) {
set_bit(CF_EOF, &con->flags);
mutex_unlock(&con->sock_mutex);
} else {
@ -1134,242 +1151,6 @@ static int sctp_bind_addrs(struct socket *sock, uint16_t port)
return result;
}
/* Initiate an SCTP association.
This is a special case of send_to_sock() in that we don't yet have a
peeled-off socket for this association, so we use the listening socket
and add the primary IP address of the remote node.
*/
static void sctp_connect_to_sock(struct connection *con)
{
struct sockaddr_storage daddr;
int result;
int addr_len;
struct socket *sock;
unsigned int mark;
mutex_lock(&con->sock_mutex);
/* Some odd races can cause double-connects, ignore them */
if (con->retries++ > MAX_CONNECT_RETRIES)
goto out;
if (con->sock) {
log_print("node %d already connected.", con->nodeid);
goto out;
}
memset(&daddr, 0, sizeof(daddr));
result = nodeid_to_addr(con->nodeid, &daddr, NULL, true, &mark);
if (result < 0) {
log_print("no address for nodeid %d", con->nodeid);
goto out;
}
/* Create a socket to communicate with */
result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
SOCK_STREAM, IPPROTO_SCTP, &sock);
if (result < 0)
goto socket_err;
sock_set_mark(sock->sk, mark);
add_sock(sock, con);
/* Bind to all addresses. */
if (sctp_bind_addrs(con->sock, 0))
goto bind_err;
make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
log_print_ratelimited("connecting to %d", con->nodeid);
/* Turn off Nagle's algorithm */
sctp_sock_set_nodelay(sock->sk);
/*
* Make sock->ops->connect() function return in specified time,
* since O_NONBLOCK argument in connect() function does not work here,
* then, we should restore the default value of this attribute.
*/
sock_set_sndtimeo(sock->sk, 5);
result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
0);
sock_set_sndtimeo(sock->sk, 0);
if (result == -EINPROGRESS)
result = 0;
if (result == 0) {
if (!test_and_set_bit(CF_CONNECTED, &con->flags))
log_print("successful connected to node %d", con->nodeid);
goto out;
}
bind_err:
con->sock = NULL;
sock_release(sock);
socket_err:
/*
* Some errors are fatal and this list might need adjusting. For other
* errors we try again until the max number of retries is reached.
*/
if (result != -EHOSTUNREACH &&
result != -ENETUNREACH &&
result != -ENETDOWN &&
result != -EINVAL &&
result != -EPROTONOSUPPORT) {
log_print("connect %d try %d error %d", con->nodeid,
con->retries, result);
mutex_unlock(&con->sock_mutex);
msleep(1000);
lowcomms_connect_sock(con);
return;
}
out:
mutex_unlock(&con->sock_mutex);
}
/* Connect a new socket to its peer */
static void tcp_connect_to_sock(struct connection *con)
{
struct sockaddr_storage saddr, src_addr;
unsigned int mark;
int addr_len;
struct socket *sock = NULL;
int result;
mutex_lock(&con->sock_mutex);
if (con->retries++ > MAX_CONNECT_RETRIES)
goto out;
/* Some odd races can cause double-connects, ignore them */
if (con->sock)
goto out;
/* Create a socket to communicate with */
result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
SOCK_STREAM, IPPROTO_TCP, &sock);
if (result < 0)
goto out_err;
memset(&saddr, 0, sizeof(saddr));
result = nodeid_to_addr(con->nodeid, &saddr, NULL, false, &mark);
if (result < 0) {
log_print("no address for nodeid %d", con->nodeid);
goto out_err;
}
sock_set_mark(sock->sk, mark);
add_sock(sock, con);
/* Bind to our cluster-known address connecting to avoid
routing problems */
memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
make_sockaddr(&src_addr, 0, &addr_len);
result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
addr_len);
if (result < 0) {
log_print("could not bind for connect: %d", result);
/* This *may* not indicate a critical error */
}
make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
log_print_ratelimited("connecting to %d", con->nodeid);
/* Turn off Nagle's algorithm */
tcp_sock_set_nodelay(sock->sk);
result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
O_NONBLOCK);
if (result == -EINPROGRESS)
result = 0;
if (result == 0)
goto out;
out_err:
if (con->sock) {
sock_release(con->sock);
con->sock = NULL;
} else if (sock) {
sock_release(sock);
}
/*
* Some errors are fatal and this list might need adjusting. For other
* errors we try again until the max number of retries is reached.
*/
if (result != -EHOSTUNREACH &&
result != -ENETUNREACH &&
result != -ENETDOWN &&
result != -EINVAL &&
result != -EPROTONOSUPPORT) {
log_print("connect %d try %d error %d", con->nodeid,
con->retries, result);
mutex_unlock(&con->sock_mutex);
msleep(1000);
lowcomms_connect_sock(con);
return;
}
out:
mutex_unlock(&con->sock_mutex);
return;
}
/* On error caller must run dlm_close_sock() for the
* listen connection socket.
*/
static int tcp_create_listen_sock(struct listen_connection *con,
struct sockaddr_storage *saddr)
{
struct socket *sock = NULL;
int result = 0;
int addr_len;
if (dlm_local_addr[0]->ss_family == AF_INET)
addr_len = sizeof(struct sockaddr_in);
else
addr_len = sizeof(struct sockaddr_in6);
/* Create a socket to communicate with */
result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
SOCK_STREAM, IPPROTO_TCP, &sock);
if (result < 0) {
log_print("Can't create listening comms socket");
goto create_out;
}
sock_set_mark(sock->sk, dlm_config.ci_mark);
/* Turn off Nagle's algorithm */
tcp_sock_set_nodelay(sock->sk);
sock_set_reuseaddr(sock->sk);
add_listen_sock(sock, con);
/* Bind to our port */
make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
if (result < 0) {
log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
goto create_out;
}
sock_set_keepalive(sock->sk);
result = sock->ops->listen(sock, 5);
if (result < 0) {
log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
goto create_out;
}
return 0;
create_out:
return result;
}
/* Get local addresses */
static void init_local(void)
{
@ -1396,63 +1177,6 @@ static void deinit_local(void)
kfree(dlm_local_addr[i]);
}
/* Initialise SCTP socket and bind to all interfaces
* On error caller must run dlm_close_sock() for the
* listen connection socket.
*/
static int sctp_listen_for_all(struct listen_connection *con)
{
struct socket *sock = NULL;
int result = -EINVAL;
log_print("Using SCTP for communications");
result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
SOCK_STREAM, IPPROTO_SCTP, &sock);
if (result < 0) {
log_print("Can't create comms socket, check SCTP is loaded");
goto out;
}
sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
sock_set_mark(sock->sk, dlm_config.ci_mark);
sctp_sock_set_nodelay(sock->sk);
add_listen_sock(sock, con);
/* Bind to all addresses. */
result = sctp_bind_addrs(con->sock, dlm_config.ci_tcp_port);
if (result < 0)
goto out;
result = sock->ops->listen(sock, 5);
if (result < 0) {
log_print("Can't set socket listening");
goto out;
}
return 0;
out:
return result;
}
static int tcp_listen_for_all(void)
{
/* We don't support multi-homed hosts */
if (dlm_local_count > 1) {
log_print("TCP protocol can't handle multi-homed hosts, "
"try SCTP");
return -EINVAL;
}
log_print("Using TCP for communications");
return tcp_create_listen_sock(&listen_con, dlm_local_addr[0]);
}
static struct writequeue_entry *new_writequeue_entry(struct connection *con,
gfp_t allocation)
{
@ -1528,19 +1252,37 @@ static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
{
struct writequeue_entry *e;
struct dlm_msg *msg;
bool sleepable;
msg = kzalloc(sizeof(*msg), allocation);
if (!msg)
return NULL;
/* this mutex is being used as a wait to avoid multiple "fast"
* new writequeue page list entry allocs in new_wq_entry in
* normal operation which is sleepable context. Without it
* we could end in multiple writequeue entries with one
* dlm message because multiple callers were waiting at
* the writequeue_lock in new_wq_entry().
*/
sleepable = gfpflags_normal_context(allocation);
if (sleepable)
mutex_lock(&con->wq_alloc);
kref_init(&msg->ref);
e = new_wq_entry(con, len, allocation, ppc, cb, mh);
if (!e) {
if (sleepable)
mutex_unlock(&con->wq_alloc);
kfree(msg);
return NULL;
}
if (sleepable)
mutex_unlock(&con->wq_alloc);
msg->ppc = *ppc;
msg->len = len;
msg->entry = e;
@ -1646,10 +1388,9 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
/* Send a message */
static void send_to_sock(struct connection *con)
{
int ret = 0;
const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
struct writequeue_entry *e;
int len, offset;
int len, offset, ret;
int count = 0;
mutex_lock(&con->sock_mutex);
@ -1658,7 +1399,8 @@ static void send_to_sock(struct connection *con)
spin_lock(&con->writequeue_lock);
for (;;) {
if (list_empty(&con->writequeue))
e = con_next_wq(con);
if (!e)
break;
e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
@ -1667,25 +1409,22 @@ static void send_to_sock(struct connection *con)
BUG_ON(len == 0 && e->users == 0);
spin_unlock(&con->writequeue_lock);
ret = 0;
if (len) {
ret = kernel_sendpage(con->sock, e->page, offset, len,
msg_flags);
if (ret == -EAGAIN || ret == 0) {
if (ret == -EAGAIN &&
test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
!test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
/* Notify TCP that we're limited by the
* application window size.
*/
set_bit(SOCK_NOSPACE, &con->sock->flags);
con->sock->sk->sk_write_pending++;
}
cond_resched();
goto out;
} else if (ret < 0)
goto out;
}
ret = kernel_sendpage(con->sock, e->page, offset, len,
msg_flags);
if (ret == -EAGAIN || ret == 0) {
if (ret == -EAGAIN &&
test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
!test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
/* Notify TCP that we're limited by the
* application window size.
*/
set_bit(SOCK_NOSPACE, &con->sock->flags);
con->sock->sk->sk_write_pending++;
}
cond_resched();
goto out;
} else if (ret < 0)
goto out;
/* Don't starve people filling buffers */
if (++count >= MAX_SEND_MSG_COUNT) {
@ -1770,12 +1509,9 @@ int dlm_lowcomms_close(int nodeid)
static void process_recv_sockets(struct work_struct *work)
{
struct connection *con = container_of(work, struct connection, rwork);
int err;
clear_bit(CF_READ_PENDING, &con->flags);
do {
err = receive_from_sock(con);
} while (!err);
receive_from_sock(con);
}
static void process_listen_recv_socket(struct work_struct *work)
@ -1783,6 +1519,74 @@ static void process_listen_recv_socket(struct work_struct *work)
accept_from_sock(&listen_con);
}
static void dlm_connect(struct connection *con)
{
struct sockaddr_storage addr;
int result, addr_len;
struct socket *sock;
unsigned int mark;
/* Some odd races can cause double-connects, ignore them */
if (con->retries++ > MAX_CONNECT_RETRIES)
return;
if (con->sock) {
log_print("node %d already connected.", con->nodeid);
return;
}
memset(&addr, 0, sizeof(addr));
result = nodeid_to_addr(con->nodeid, &addr, NULL,
dlm_proto_ops->try_new_addr, &mark);
if (result < 0) {
log_print("no address for nodeid %d", con->nodeid);
return;
}
/* Create a socket to communicate with */
result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
SOCK_STREAM, dlm_proto_ops->proto, &sock);
if (result < 0)
goto socket_err;
sock_set_mark(sock->sk, mark);
dlm_proto_ops->sockopts(sock);
add_sock(sock, con);
result = dlm_proto_ops->bind(sock);
if (result < 0)
goto add_sock_err;
log_print_ratelimited("connecting to %d", con->nodeid);
make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
addr_len);
if (result < 0)
goto add_sock_err;
return;
add_sock_err:
dlm_close_sock(&con->sock);
socket_err:
/*
* Some errors are fatal and this list might need adjusting. For other
* errors we try again until the max number of retries is reached.
*/
if (result != -EHOSTUNREACH &&
result != -ENETUNREACH &&
result != -ENETDOWN &&
result != -EINVAL &&
result != -EPROTONOSUPPORT) {
log_print("connect %d try %d error %d", con->nodeid,
con->retries, result);
msleep(1000);
lowcomms_connect_sock(con);
}
}
/* Send workqueue function */
static void process_send_sockets(struct work_struct *work)
{
@ -1797,11 +1601,15 @@ static void process_send_sockets(struct work_struct *work)
dlm_midcomms_unack_msg_resend(con->nodeid);
}
if (con->sock == NULL) { /* not mutex protected so check it inside too */
if (con->sock == NULL) {
if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
msleep(1000);
con->connect_action(con);
mutex_lock(&con->sock_mutex);
dlm_connect(con);
mutex_unlock(&con->sock_mutex);
}
if (!list_empty(&con->writequeue))
send_to_sock(con);
}
@ -1840,8 +1648,8 @@ static int work_start(void)
static void shutdown_conn(struct connection *con)
{
if (con->shutdown_action)
con->shutdown_action(con);
if (dlm_proto_ops->shutdown_action)
dlm_proto_ops->shutdown_action(con);
}
void dlm_lowcomms_shutdown(void)
@ -1948,8 +1756,198 @@ void dlm_lowcomms_stop(void)
srcu_read_unlock(&connections_srcu, idx);
work_stop();
deinit_local();
dlm_proto_ops = NULL;
}
static int dlm_listen_for_all(void)
{
struct socket *sock;
int result;
log_print("Using %s for communications",
dlm_proto_ops->name);
result = dlm_proto_ops->listen_validate();
if (result < 0)
return result;
result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
SOCK_STREAM, dlm_proto_ops->proto, &sock);
if (result < 0) {
log_print("Can't create comms socket, check SCTP is loaded");
goto out;
}
sock_set_mark(sock->sk, dlm_config.ci_mark);
dlm_proto_ops->listen_sockopts(sock);
result = dlm_proto_ops->listen_bind(sock);
if (result < 0)
goto out;
save_listen_callbacks(sock);
add_listen_sock(sock, &listen_con);
INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
result = sock->ops->listen(sock, 5);
if (result < 0) {
dlm_close_sock(&listen_con.sock);
goto out;
}
return 0;
out:
sock_release(sock);
return result;
}
static int dlm_tcp_bind(struct socket *sock)
{
struct sockaddr_storage src_addr;
int result, addr_len;
/* Bind to our cluster-known address connecting to avoid
* routing problems.
*/
memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
make_sockaddr(&src_addr, 0, &addr_len);
result = sock->ops->bind(sock, (struct sockaddr *)&src_addr,
addr_len);
if (result < 0) {
/* This *may* not indicate a critical error */
log_print("could not bind for connect: %d", result);
}
return 0;
}
static int dlm_tcp_connect(struct connection *con, struct socket *sock,
struct sockaddr *addr, int addr_len)
{
int ret;
ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK);
switch (ret) {
case -EINPROGRESS:
fallthrough;
case 0:
return 0;
}
return ret;
}
static int dlm_tcp_listen_validate(void)
{
/* We don't support multi-homed hosts */
if (dlm_local_count > 1) {
log_print("TCP protocol can't handle multi-homed hosts, try SCTP");
return -EINVAL;
}
return 0;
}
static void dlm_tcp_sockopts(struct socket *sock)
{
/* Turn off Nagle's algorithm */
tcp_sock_set_nodelay(sock->sk);
}
static void dlm_tcp_listen_sockopts(struct socket *sock)
{
dlm_tcp_sockopts(sock);
sock_set_reuseaddr(sock->sk);
}
static int dlm_tcp_listen_bind(struct socket *sock)
{
int addr_len;
/* Bind to our port */
make_sockaddr(dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
return sock->ops->bind(sock, (struct sockaddr *)dlm_local_addr[0],
addr_len);
}
static const struct dlm_proto_ops dlm_tcp_ops = {
.name = "TCP",
.proto = IPPROTO_TCP,
.connect = dlm_tcp_connect,
.sockopts = dlm_tcp_sockopts,
.bind = dlm_tcp_bind,
.listen_validate = dlm_tcp_listen_validate,
.listen_sockopts = dlm_tcp_listen_sockopts,
.listen_bind = dlm_tcp_listen_bind,
.shutdown_action = dlm_tcp_shutdown,
.eof_condition = tcp_eof_condition,
};
static int dlm_sctp_bind(struct socket *sock)
{
return sctp_bind_addrs(sock, 0);
}
static int dlm_sctp_connect(struct connection *con, struct socket *sock,
struct sockaddr *addr, int addr_len)
{
int ret;
/*
* Make sock->ops->connect() function return in specified time,
* since O_NONBLOCK argument in connect() function does not work here,
* then, we should restore the default value of this attribute.
*/
sock_set_sndtimeo(sock->sk, 5);
ret = sock->ops->connect(sock, addr, addr_len, 0);
sock_set_sndtimeo(sock->sk, 0);
if (ret < 0)
return ret;
if (!test_and_set_bit(CF_CONNECTED, &con->flags))
log_print("successful connected to node %d", con->nodeid);
return 0;
}
static int dlm_sctp_listen_validate(void)
{
if (!IS_ENABLED(CONFIG_IP_SCTP)) {
log_print("SCTP is not enabled by this kernel");
return -EOPNOTSUPP;
}
request_module("sctp");
return 0;
}
static int dlm_sctp_bind_listen(struct socket *sock)
{
return sctp_bind_addrs(sock, dlm_config.ci_tcp_port);
}
static void dlm_sctp_sockopts(struct socket *sock)
{
/* Turn off Nagle's algorithm */
sctp_sock_set_nodelay(sock->sk);
sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
}
static const struct dlm_proto_ops dlm_sctp_ops = {
.name = "SCTP",
.proto = IPPROTO_SCTP,
.try_new_addr = true,
.connect = dlm_sctp_connect,
.sockopts = dlm_sctp_sockopts,
.bind = dlm_sctp_bind,
.listen_validate = dlm_sctp_listen_validate,
.listen_sockopts = dlm_sctp_sockopts,
.listen_bind = dlm_sctp_bind_listen,
};
int dlm_lowcomms_start(void)
{
int error = -EINVAL;
@ -1976,23 +1974,27 @@ int dlm_lowcomms_start(void)
/* Start listening */
switch (dlm_config.ci_protocol) {
case DLM_PROTO_TCP:
error = tcp_listen_for_all();
dlm_proto_ops = &dlm_tcp_ops;
break;
case DLM_PROTO_SCTP:
error = sctp_listen_for_all(&listen_con);
dlm_proto_ops = &dlm_sctp_ops;
break;
default:
log_print("Invalid protocol identifier %d set",
dlm_config.ci_protocol);
error = -EINVAL;
break;
goto fail_proto_ops;
}
error = dlm_listen_for_all();
if (error)
goto fail_unlisten;
goto fail_listen;
return 0;
fail_unlisten:
fail_listen:
dlm_proto_ops = NULL;
fail_proto_ops:
dlm_allow_conn = 0;
dlm_close_sock(&listen_con.sock);
work_stop();

View File

@ -46,6 +46,7 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg);
int dlm_lowcomms_connect_node(int nodeid);
int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark);
int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
void dlm_midcomms_receive_done(int nodeid);
#endif /* __LOWCOMMS_DOT_H__ */

View File

@ -443,8 +443,10 @@ static int ping_members(struct dlm_ls *ls)
list_for_each_entry(memb, &ls->ls_nodes, list) {
error = dlm_recovery_stopped(ls);
if (error)
if (error) {
error = -EINTR;
break;
}
error = dlm_rcom_status(ls, memb->nodeid, 0);
if (error)
break;

View File

@ -109,12 +109,6 @@
* compatibility. There exists better ways to make a better handling.
* However this should be changed in the next major version bump of dlm.
*
* Ack handling:
*
* Currently we send an ack message for every dlm message. However we
* can ack multiple dlm messages with one ack by just delaying the ack
* message. Will reduce some traffic but makes the drop detection slower.
*
* Tail Size checking:
*
* There exists a message tail payload in e.g. DLM_MSG however we don't
@ -169,6 +163,7 @@ struct midcomms_node {
#define DLM_NODE_FLAG_CLOSE 1
#define DLM_NODE_FLAG_STOP_TX 2
#define DLM_NODE_FLAG_STOP_RX 3
#define DLM_NODE_ULP_DELIVERED 4
unsigned long flags;
wait_queue_head_t shutdown_wait;
@ -480,11 +475,12 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
{
if (seq == node->seq_next) {
node->seq_next++;
/* send ack before fin */
dlm_send_ack(node->nodeid, node->seq_next);
switch (p->header.h_cmd) {
case DLM_FIN:
/* send ack before fin */
dlm_send_ack(node->nodeid, node->seq_next);
spin_lock(&node->state_lock);
pr_debug("receive fin msg from node %d with state %s\n",
node->nodeid, dlm_state_str(node->state));
@ -534,6 +530,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
default:
WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
dlm_receive_buffer(p, node->nodeid);
set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
break;
}
} else {
@ -933,6 +930,49 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
return ret;
}
void dlm_midcomms_receive_done(int nodeid)
{
struct midcomms_node *node;
int idx;
idx = srcu_read_lock(&nodes_srcu);
node = nodeid2node(nodeid, 0);
if (!node) {
srcu_read_unlock(&nodes_srcu, idx);
return;
}
/* old protocol, we do nothing */
switch (node->version) {
case DLM_VERSION_3_2:
break;
default:
srcu_read_unlock(&nodes_srcu, idx);
return;
}
/* do nothing if we didn't delivered stateful to ulp */
if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
&node->flags)) {
srcu_read_unlock(&nodes_srcu, idx);
return;
}
spin_lock(&node->state_lock);
/* we only ack if state is ESTABLISHED */
switch (node->state) {
case DLM_ESTABLISHED:
spin_unlock(&node->state_lock);
dlm_send_ack(node->nodeid, node->seq_next);
break;
default:
spin_unlock(&node->state_lock);
/* do nothing FIN has it's own ack send */
break;
};
srcu_read_unlock(&nodes_srcu, idx);
}
void dlm_midcomms_unack_msg_resend(int nodeid)
{
struct midcomms_node *node;

View File

@ -89,22 +89,15 @@ static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type,
return 0;
}
static void _send_rcom(struct dlm_ls *ls, struct dlm_rcom *rc)
static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc)
{
dlm_rcom_out(rc);
}
static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
struct dlm_rcom *rc)
{
_send_rcom(ls, rc);
dlm_midcomms_commit_mhandle(mh);
}
static void send_rcom_stateless(struct dlm_ls *ls, struct dlm_msg *msg,
struct dlm_rcom *rc)
static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc)
{
_send_rcom(ls, rc);
dlm_rcom_out(rc);
dlm_lowcomms_commit_msg(msg);
dlm_lowcomms_put_msg(msg);
}
@ -204,7 +197,7 @@ retry:
allow_sync_reply(ls, &rc->rc_id);
memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
send_rcom_stateless(ls, msg, rc);
send_rcom_stateless(msg, rc);
error = dlm_wait_function(ls, &rcom_response);
disallow_sync_reply(ls);
@ -287,7 +280,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
spin_unlock(&ls->ls_recover_lock);
do_send:
send_rcom_stateless(ls, msg, rc);
send_rcom_stateless(msg, rc);
}
static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
@ -327,7 +320,7 @@ retry:
allow_sync_reply(ls, &rc->rc_id);
memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE);
send_rcom_stateless(ls, msg, rc);
send_rcom_stateless(msg, rc);
error = dlm_wait_function(ls, &rcom_response);
disallow_sync_reply(ls);
@ -356,7 +349,7 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
nodeid);
send_rcom_stateless(ls, msg, rc);
send_rcom_stateless(msg, rc);
}
int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
@ -373,7 +366,7 @@ int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
memcpy(rc->rc_buf, r->res_name, r->res_length);
rc->rc_id = (unsigned long) r->res_id;
send_rcom(ls, mh, rc);
send_rcom(mh, rc);
out:
return error;
}
@ -404,7 +397,7 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
rc->rc_id = rc_in->rc_id;
rc->rc_seq_reply = rc_in->rc_seq;
send_rcom(ls, mh, rc);
send_rcom(mh, rc);
}
static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
@ -461,7 +454,7 @@ int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
pack_rcom_lock(r, lkb, rl);
rc->rc_id = (unsigned long) r;
send_rcom(ls, mh, rc);
send_rcom(mh, rc);
out:
return error;
}
@ -487,7 +480,7 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
rc->rc_id = rc_in->rc_id;
rc->rc_seq_reply = rc_in->rc_seq;
send_rcom(ls, mh, rc);
send_rcom(mh, rc);
}
/* If the lockspace doesn't exist then still send a status message

View File

@ -125,8 +125,10 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
dlm_recover_waiters_pre(ls);
error = dlm_recovery_stopped(ls);
if (error)
if (error) {
error = -EINTR;
goto fail;
}
if (neg || dlm_no_directory(ls)) {
/*