mirror of
https://mirrors.bfsu.edu.cn/git/linux.git
synced 2024-11-24 20:54:10 +08:00
1d6e8131cf
This patch converts the DLM TCP lowcomms to use workqueues rather than using its own daemon functions. Simultaneously removing a lot of code and making it more scalable on multi-processor machines. Signed-Off-By: Patrick Caulfield <pcaulfie@redhat.com> Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
1211 lines
29 KiB
C
1211 lines
29 KiB
C
/******************************************************************************
|
|
*******************************************************************************
|
|
**
|
|
** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
|
|
** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
|
|
**
|
|
** This copyrighted material is made available to anyone wishing to use,
|
|
** modify, copy, or redistribute it subject to the terms and conditions
|
|
** of the GNU General Public License v.2.
|
|
**
|
|
*******************************************************************************
|
|
******************************************************************************/
|
|
|
|
/*
|
|
* lowcomms.c
|
|
*
|
|
* This is the "low-level" comms layer.
|
|
*
|
|
* It is responsible for sending/receiving messages
|
|
* from other nodes in the cluster.
|
|
*
|
|
* Cluster nodes are referred to by their nodeids. nodeids are
|
|
* simply 32 bit numbers to the locking module - if they need to
|
|
* be expanded for the cluster infrastructure then that is it's
|
|
* responsibility. It is this layer's
|
|
* responsibility to resolve these into IP address or
|
|
* whatever it needs for inter-node communication.
|
|
*
|
|
* The comms level is two kernel threads that deal mainly with
|
|
* the receiving of messages from other nodes and passing them
|
|
* up to the mid-level comms layer (which understands the
|
|
* message format) for execution by the locking core, and
|
|
* a send thread which does all the setting up of connections
|
|
* to remote nodes and the sending of data. Threads are not allowed
|
|
* to send their own data because it may cause them to wait in times
|
|
* of high load. Also, this way, the sending thread can collect together
|
|
* messages bound for one node and send them in one block.
|
|
*
|
|
* I don't see any problem with the recv thread executing the locking
|
|
* code on behalf of remote processes as the locking code is
|
|
* short, efficient and never (well, hardly ever) waits.
|
|
*
|
|
*/
|
|
|
|
#include <asm/ioctls.h>
|
|
#include <net/sock.h>
|
|
#include <net/tcp.h>
|
|
#include <net/sctp/user.h>
|
|
#include <linux/pagemap.h>
|
|
#include <linux/socket.h>
|
|
#include <linux/idr.h>
|
|
|
|
#include "dlm_internal.h"
|
|
#include "lowcomms.h"
|
|
#include "config.h"
|
|
#include "midcomms.h"
|
|
|
|
static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
|
|
static int dlm_local_count;
|
|
static int dlm_local_nodeid;
|
|
|
|
/* One of these per connected node */
|
|
|
|
#define NI_INIT_PENDING 1
|
|
#define NI_WRITE_PENDING 2
|
|
|
|
struct nodeinfo {
|
|
spinlock_t lock;
|
|
sctp_assoc_t assoc_id;
|
|
unsigned long flags;
|
|
struct list_head write_list; /* nodes with pending writes */
|
|
struct list_head writequeue; /* outgoing writequeue_entries */
|
|
spinlock_t writequeue_lock;
|
|
int nodeid;
|
|
struct work_struct swork; /* Send workqueue */
|
|
struct work_struct lwork; /* Locking workqueue */
|
|
};
|
|
|
|
static DEFINE_IDR(nodeinfo_idr);
|
|
static DECLARE_RWSEM(nodeinfo_lock);
|
|
static int max_nodeid;
|
|
|
|
struct cbuf {
|
|
unsigned int base;
|
|
unsigned int len;
|
|
unsigned int mask;
|
|
};
|
|
|
|
/* Just the one of these, now. But this struct keeps
|
|
the connection-specific variables together */
|
|
|
|
#define CF_READ_PENDING 1
|
|
|
|
struct connection {
|
|
struct socket *sock;
|
|
unsigned long flags;
|
|
struct page *rx_page;
|
|
atomic_t waiting_requests;
|
|
struct cbuf cb;
|
|
int eagain_flag;
|
|
struct work_struct work; /* Send workqueue */
|
|
};
|
|
|
|
/* An entry waiting to be sent */
|
|
|
|
struct writequeue_entry {
|
|
struct list_head list;
|
|
struct page *page;
|
|
int offset;
|
|
int len;
|
|
int end;
|
|
int users;
|
|
struct nodeinfo *ni;
|
|
};
|
|
|
|
static void cbuf_add(struct cbuf *cb, int n)
|
|
{
|
|
cb->len += n;
|
|
}
|
|
|
|
static int cbuf_data(struct cbuf *cb)
|
|
{
|
|
return ((cb->base + cb->len) & cb->mask);
|
|
}
|
|
|
|
static void cbuf_init(struct cbuf *cb, int size)
|
|
{
|
|
cb->base = cb->len = 0;
|
|
cb->mask = size-1;
|
|
}
|
|
|
|
static void cbuf_eat(struct cbuf *cb, int n)
|
|
{
|
|
cb->len -= n;
|
|
cb->base += n;
|
|
cb->base &= cb->mask;
|
|
}
|
|
|
|
/* List of nodes which have writes pending */
|
|
static LIST_HEAD(write_nodes);
|
|
static DEFINE_SPINLOCK(write_nodes_lock);
|
|
|
|
|
|
/* Maximum number of incoming messages to process before
|
|
* doing a schedule()
|
|
*/
|
|
#define MAX_RX_MSG_COUNT 25
|
|
|
|
/* Work queues */
|
|
static struct workqueue_struct *recv_workqueue;
|
|
static struct workqueue_struct *send_workqueue;
|
|
static struct workqueue_struct *lock_workqueue;
|
|
|
|
/* The SCTP connection */
|
|
static struct connection sctp_con;
|
|
|
|
static void process_send_sockets(struct work_struct *work);
|
|
static void process_recv_sockets(struct work_struct *work);
|
|
static void process_lock_request(struct work_struct *work);
|
|
|
|
static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
|
|
{
|
|
struct sockaddr_storage addr;
|
|
int error;
|
|
|
|
if (!dlm_local_count)
|
|
return -1;
|
|
|
|
error = dlm_nodeid_to_addr(nodeid, &addr);
|
|
if (error)
|
|
return error;
|
|
|
|
if (dlm_local_addr[0]->ss_family == AF_INET) {
|
|
struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
|
|
struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
|
|
ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
|
|
} else {
|
|
struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr;
|
|
struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
|
|
memcpy(&ret6->sin6_addr, &in6->sin6_addr,
|
|
sizeof(in6->sin6_addr));
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* If alloc is 0 here we will not attempt to allocate a new
|
|
nodeinfo struct */
|
|
static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
|
|
{
|
|
struct nodeinfo *ni;
|
|
int r;
|
|
int n;
|
|
|
|
down_read(&nodeinfo_lock);
|
|
ni = idr_find(&nodeinfo_idr, nodeid);
|
|
up_read(&nodeinfo_lock);
|
|
|
|
if (ni || !alloc)
|
|
return ni;
|
|
|
|
down_write(&nodeinfo_lock);
|
|
|
|
ni = idr_find(&nodeinfo_idr, nodeid);
|
|
if (ni)
|
|
goto out_up;
|
|
|
|
r = idr_pre_get(&nodeinfo_idr, alloc);
|
|
if (!r)
|
|
goto out_up;
|
|
|
|
ni = kmalloc(sizeof(struct nodeinfo), alloc);
|
|
if (!ni)
|
|
goto out_up;
|
|
|
|
r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
|
|
if (r) {
|
|
kfree(ni);
|
|
ni = NULL;
|
|
goto out_up;
|
|
}
|
|
if (n != nodeid) {
|
|
idr_remove(&nodeinfo_idr, n);
|
|
kfree(ni);
|
|
ni = NULL;
|
|
goto out_up;
|
|
}
|
|
memset(ni, 0, sizeof(struct nodeinfo));
|
|
spin_lock_init(&ni->lock);
|
|
INIT_LIST_HEAD(&ni->writequeue);
|
|
spin_lock_init(&ni->writequeue_lock);
|
|
INIT_WORK(&ni->lwork, process_lock_request);
|
|
INIT_WORK(&ni->swork, process_send_sockets);
|
|
ni->nodeid = nodeid;
|
|
|
|
if (nodeid > max_nodeid)
|
|
max_nodeid = nodeid;
|
|
out_up:
|
|
up_write(&nodeinfo_lock);
|
|
|
|
return ni;
|
|
}
|
|
|
|
/* Don't call this too often... */
|
|
static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
|
|
{
|
|
int i;
|
|
struct nodeinfo *ni;
|
|
|
|
for (i=1; i<=max_nodeid; i++) {
|
|
ni = nodeid2nodeinfo(i, 0);
|
|
if (ni && ni->assoc_id == assoc)
|
|
return ni;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
/* Data or notification available on socket */
|
|
static void lowcomms_data_ready(struct sock *sk, int count_unused)
|
|
{
|
|
if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
|
|
queue_work(recv_workqueue, &sctp_con.work);
|
|
}
|
|
|
|
|
|
/* Add the port number to an IP6 or 4 sockaddr and return the address length.
|
|
Also padd out the struct with zeros to make comparisons meaningful */
|
|
|
|
static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
|
|
int *addr_len)
|
|
{
|
|
struct sockaddr_in *local4_addr;
|
|
struct sockaddr_in6 *local6_addr;
|
|
|
|
if (!dlm_local_count)
|
|
return;
|
|
|
|
if (!port) {
|
|
if (dlm_local_addr[0]->ss_family == AF_INET) {
|
|
local4_addr = (struct sockaddr_in *)dlm_local_addr[0];
|
|
port = be16_to_cpu(local4_addr->sin_port);
|
|
} else {
|
|
local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0];
|
|
port = be16_to_cpu(local6_addr->sin6_port);
|
|
}
|
|
}
|
|
|
|
saddr->ss_family = dlm_local_addr[0]->ss_family;
|
|
if (dlm_local_addr[0]->ss_family == AF_INET) {
|
|
struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
|
|
in4_addr->sin_port = cpu_to_be16(port);
|
|
memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
|
|
memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
|
|
sizeof(struct sockaddr_in));
|
|
*addr_len = sizeof(struct sockaddr_in);
|
|
} else {
|
|
struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
|
|
in6_addr->sin6_port = cpu_to_be16(port);
|
|
memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
|
|
sizeof(struct sockaddr_in6));
|
|
*addr_len = sizeof(struct sockaddr_in6);
|
|
}
|
|
}
|
|
|
|
/* Close the connection and tidy up */
|
|
static void close_connection(void)
|
|
{
|
|
if (sctp_con.sock) {
|
|
sock_release(sctp_con.sock);
|
|
sctp_con.sock = NULL;
|
|
}
|
|
|
|
if (sctp_con.rx_page) {
|
|
__free_page(sctp_con.rx_page);
|
|
sctp_con.rx_page = NULL;
|
|
}
|
|
}
|
|
|
|
/* We only send shutdown messages to nodes that are not part of the cluster */
|
|
static void send_shutdown(sctp_assoc_t associd)
|
|
{
|
|
static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
|
|
struct msghdr outmessage;
|
|
struct cmsghdr *cmsg;
|
|
struct sctp_sndrcvinfo *sinfo;
|
|
int ret;
|
|
|
|
outmessage.msg_name = NULL;
|
|
outmessage.msg_namelen = 0;
|
|
outmessage.msg_control = outcmsg;
|
|
outmessage.msg_controllen = sizeof(outcmsg);
|
|
outmessage.msg_flags = MSG_EOR;
|
|
|
|
cmsg = CMSG_FIRSTHDR(&outmessage);
|
|
cmsg->cmsg_level = IPPROTO_SCTP;
|
|
cmsg->cmsg_type = SCTP_SNDRCV;
|
|
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
|
|
outmessage.msg_controllen = cmsg->cmsg_len;
|
|
sinfo = CMSG_DATA(cmsg);
|
|
memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
|
|
|
|
sinfo->sinfo_flags |= MSG_EOF;
|
|
sinfo->sinfo_assoc_id = associd;
|
|
|
|
ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0);
|
|
|
|
if (ret != 0)
|
|
log_print("send EOF to node failed: %d", ret);
|
|
}
|
|
|
|
|
|
/* INIT failed but we don't know which node...
|
|
restart INIT on all pending nodes */
|
|
static void init_failed(void)
|
|
{
|
|
int i;
|
|
struct nodeinfo *ni;
|
|
|
|
for (i=1; i<=max_nodeid; i++) {
|
|
ni = nodeid2nodeinfo(i, 0);
|
|
if (!ni)
|
|
continue;
|
|
|
|
if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
|
|
ni->assoc_id = 0;
|
|
if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
|
|
spin_lock_bh(&write_nodes_lock);
|
|
list_add_tail(&ni->write_list, &write_nodes);
|
|
spin_unlock_bh(&write_nodes_lock);
|
|
queue_work(send_workqueue, &ni->swork);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Something happened to an association */
|
|
static void process_sctp_notification(struct msghdr *msg, char *buf)
|
|
{
|
|
union sctp_notification *sn = (union sctp_notification *)buf;
|
|
|
|
if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
|
|
switch (sn->sn_assoc_change.sac_state) {
|
|
|
|
case SCTP_COMM_UP:
|
|
case SCTP_RESTART:
|
|
{
|
|
/* Check that the new node is in the lockspace */
|
|
struct sctp_prim prim;
|
|
mm_segment_t fs;
|
|
int nodeid;
|
|
int prim_len, ret;
|
|
int addr_len;
|
|
struct nodeinfo *ni;
|
|
|
|
/* This seems to happen when we received a connection
|
|
* too early... or something... anyway, it happens but
|
|
* we always seem to get a real message too, see
|
|
* receive_from_sock */
|
|
|
|
if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
|
|
log_print("COMM_UP for invalid assoc ID %d",
|
|
(int)sn->sn_assoc_change.sac_assoc_id);
|
|
init_failed();
|
|
return;
|
|
}
|
|
memset(&prim, 0, sizeof(struct sctp_prim));
|
|
prim_len = sizeof(struct sctp_prim);
|
|
prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
|
|
|
|
fs = get_fs();
|
|
set_fs(get_ds());
|
|
ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
|
|
IPPROTO_SCTP,
|
|
SCTP_PRIMARY_ADDR,
|
|
(char*)&prim,
|
|
&prim_len);
|
|
set_fs(fs);
|
|
if (ret < 0) {
|
|
struct nodeinfo *ni;
|
|
|
|
log_print("getsockopt/sctp_primary_addr on "
|
|
"new assoc %d failed : %d",
|
|
(int)sn->sn_assoc_change.sac_assoc_id,
|
|
ret);
|
|
|
|
/* Retry INIT later */
|
|
ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
|
|
if (ni)
|
|
clear_bit(NI_INIT_PENDING, &ni->flags);
|
|
return;
|
|
}
|
|
make_sockaddr(&prim.ssp_addr, 0, &addr_len);
|
|
if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
|
|
log_print("reject connect from unknown addr");
|
|
send_shutdown(prim.ssp_assoc_id);
|
|
return;
|
|
}
|
|
|
|
ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
|
|
if (!ni)
|
|
return;
|
|
|
|
/* Save the assoc ID */
|
|
ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
|
|
|
|
log_print("got new/restarted association %d nodeid %d",
|
|
(int)sn->sn_assoc_change.sac_assoc_id, nodeid);
|
|
|
|
/* Send any pending writes */
|
|
clear_bit(NI_INIT_PENDING, &ni->flags);
|
|
if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
|
|
spin_lock_bh(&write_nodes_lock);
|
|
list_add_tail(&ni->write_list, &write_nodes);
|
|
spin_unlock_bh(&write_nodes_lock);
|
|
queue_work(send_workqueue, &ni->swork);
|
|
}
|
|
}
|
|
break;
|
|
|
|
case SCTP_COMM_LOST:
|
|
case SCTP_SHUTDOWN_COMP:
|
|
{
|
|
struct nodeinfo *ni;
|
|
|
|
ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
|
|
if (ni) {
|
|
spin_lock(&ni->lock);
|
|
ni->assoc_id = 0;
|
|
spin_unlock(&ni->lock);
|
|
}
|
|
}
|
|
break;
|
|
|
|
/* We don't know which INIT failed, so clear the PENDING flags
|
|
* on them all. if assoc_id is zero then it will then try
|
|
* again */
|
|
|
|
case SCTP_CANT_STR_ASSOC:
|
|
{
|
|
log_print("Can't start SCTP association - retrying");
|
|
init_failed();
|
|
}
|
|
break;
|
|
|
|
default:
|
|
log_print("unexpected SCTP assoc change id=%d state=%d",
|
|
(int)sn->sn_assoc_change.sac_assoc_id,
|
|
sn->sn_assoc_change.sac_state);
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Data received from remote end */
|
|
static int receive_from_sock(void)
|
|
{
|
|
int ret = 0;
|
|
struct msghdr msg;
|
|
struct kvec iov[2];
|
|
unsigned len;
|
|
int r;
|
|
struct sctp_sndrcvinfo *sinfo;
|
|
struct cmsghdr *cmsg;
|
|
struct nodeinfo *ni;
|
|
|
|
/* These two are marginally too big for stack allocation, but this
|
|
* function is (currently) only called by dlm_recvd so static should be
|
|
* OK.
|
|
*/
|
|
static struct sockaddr_storage msgname;
|
|
static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
|
|
|
|
if (sctp_con.sock == NULL)
|
|
goto out;
|
|
|
|
if (sctp_con.rx_page == NULL) {
|
|
/*
|
|
* This doesn't need to be atomic, but I think it should
|
|
* improve performance if it is.
|
|
*/
|
|
sctp_con.rx_page = alloc_page(GFP_ATOMIC);
|
|
if (sctp_con.rx_page == NULL)
|
|
goto out_resched;
|
|
cbuf_init(&sctp_con.cb, PAGE_CACHE_SIZE);
|
|
}
|
|
|
|
memset(&incmsg, 0, sizeof(incmsg));
|
|
memset(&msgname, 0, sizeof(msgname));
|
|
|
|
msg.msg_name = &msgname;
|
|
msg.msg_namelen = sizeof(msgname);
|
|
msg.msg_flags = 0;
|
|
msg.msg_control = incmsg;
|
|
msg.msg_controllen = sizeof(incmsg);
|
|
msg.msg_iovlen = 1;
|
|
|
|
/* I don't see why this circular buffer stuff is necessary for SCTP
|
|
* which is a packet-based protocol, but the whole thing breaks under
|
|
* load without it! The overhead is minimal (and is in the TCP lowcomms
|
|
* anyway, of course) so I'll leave it in until I can figure out what's
|
|
* really happening.
|
|
*/
|
|
|
|
/*
|
|
* iov[0] is the bit of the circular buffer between the current end
|
|
* point (cb.base + cb.len) and the end of the buffer.
|
|
*/
|
|
iov[0].iov_len = sctp_con.cb.base - cbuf_data(&sctp_con.cb);
|
|
iov[0].iov_base = page_address(sctp_con.rx_page) +
|
|
cbuf_data(&sctp_con.cb);
|
|
iov[1].iov_len = 0;
|
|
|
|
/*
|
|
* iov[1] is the bit of the circular buffer between the start of the
|
|
* buffer and the start of the currently used section (cb.base)
|
|
*/
|
|
if (cbuf_data(&sctp_con.cb) >= sctp_con.cb.base) {
|
|
iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&sctp_con.cb);
|
|
iov[1].iov_len = sctp_con.cb.base;
|
|
iov[1].iov_base = page_address(sctp_con.rx_page);
|
|
msg.msg_iovlen = 2;
|
|
}
|
|
len = iov[0].iov_len + iov[1].iov_len;
|
|
|
|
r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len,
|
|
MSG_NOSIGNAL | MSG_DONTWAIT);
|
|
if (ret <= 0)
|
|
goto out_close;
|
|
|
|
msg.msg_control = incmsg;
|
|
msg.msg_controllen = sizeof(incmsg);
|
|
cmsg = CMSG_FIRSTHDR(&msg);
|
|
sinfo = CMSG_DATA(cmsg);
|
|
|
|
if (msg.msg_flags & MSG_NOTIFICATION) {
|
|
process_sctp_notification(&msg, page_address(sctp_con.rx_page));
|
|
return 0;
|
|
}
|
|
|
|
/* Is this a new association ? */
|
|
ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL);
|
|
if (ni) {
|
|
ni->assoc_id = sinfo->sinfo_assoc_id;
|
|
if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
|
|
|
|
if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
|
|
spin_lock_bh(&write_nodes_lock);
|
|
list_add_tail(&ni->write_list, &write_nodes);
|
|
spin_unlock_bh(&write_nodes_lock);
|
|
queue_work(send_workqueue, &ni->swork);
|
|
}
|
|
}
|
|
}
|
|
|
|
/* INIT sends a message with length of 1 - ignore it */
|
|
if (r == 1)
|
|
return 0;
|
|
|
|
cbuf_add(&sctp_con.cb, ret);
|
|
// PJC: TODO: Add to node's workqueue....can we ??
|
|
ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
|
|
page_address(sctp_con.rx_page),
|
|
sctp_con.cb.base, sctp_con.cb.len,
|
|
PAGE_CACHE_SIZE);
|
|
if (ret < 0)
|
|
goto out_close;
|
|
cbuf_eat(&sctp_con.cb, ret);
|
|
|
|
out:
|
|
ret = 0;
|
|
goto out_ret;
|
|
|
|
out_resched:
|
|
lowcomms_data_ready(sctp_con.sock->sk, 0);
|
|
ret = 0;
|
|
cond_resched();
|
|
goto out_ret;
|
|
|
|
out_close:
|
|
if (ret != -EAGAIN)
|
|
log_print("error reading from sctp socket: %d", ret);
|
|
out_ret:
|
|
return ret;
|
|
}
|
|
|
|
/* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
|
|
static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
|
|
{
|
|
mm_segment_t fs;
|
|
int result = 0;
|
|
|
|
fs = get_fs();
|
|
set_fs(get_ds());
|
|
if (num == 1)
|
|
result = sctp_con.sock->ops->bind(sctp_con.sock,
|
|
(struct sockaddr *) addr,
|
|
addr_len);
|
|
else
|
|
result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
|
|
SCTP_SOCKOPT_BINDX_ADD,
|
|
(char *)addr, addr_len);
|
|
set_fs(fs);
|
|
|
|
if (result < 0)
|
|
log_print("Can't bind to port %d addr number %d",
|
|
dlm_config.ci_tcp_port, num);
|
|
|
|
return result;
|
|
}
|
|
|
|
static void init_local(void)
|
|
{
|
|
struct sockaddr_storage sas, *addr;
|
|
int i;
|
|
|
|
dlm_local_nodeid = dlm_our_nodeid();
|
|
|
|
for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
|
|
if (dlm_our_addr(&sas, i))
|
|
break;
|
|
|
|
addr = kmalloc(sizeof(*addr), GFP_KERNEL);
|
|
if (!addr)
|
|
break;
|
|
memcpy(addr, &sas, sizeof(*addr));
|
|
dlm_local_addr[dlm_local_count++] = addr;
|
|
}
|
|
}
|
|
|
|
/* Initialise SCTP socket and bind to all interfaces */
|
|
static int init_sock(void)
|
|
{
|
|
mm_segment_t fs;
|
|
struct socket *sock = NULL;
|
|
struct sockaddr_storage localaddr;
|
|
struct sctp_event_subscribe subscribe;
|
|
int result = -EINVAL, num = 1, i, addr_len;
|
|
|
|
if (!dlm_local_count) {
|
|
init_local();
|
|
if (!dlm_local_count) {
|
|
log_print("no local IP address has been set");
|
|
goto out;
|
|
}
|
|
}
|
|
|
|
result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
|
|
IPPROTO_SCTP, &sock);
|
|
if (result < 0) {
|
|
log_print("Can't create comms socket, check SCTP is loaded");
|
|
goto out;
|
|
}
|
|
|
|
/* Listen for events */
|
|
memset(&subscribe, 0, sizeof(subscribe));
|
|
subscribe.sctp_data_io_event = 1;
|
|
subscribe.sctp_association_event = 1;
|
|
subscribe.sctp_send_failure_event = 1;
|
|
subscribe.sctp_shutdown_event = 1;
|
|
subscribe.sctp_partial_delivery_event = 1;
|
|
|
|
fs = get_fs();
|
|
set_fs(get_ds());
|
|
result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
|
|
(char *)&subscribe, sizeof(subscribe));
|
|
set_fs(fs);
|
|
|
|
if (result < 0) {
|
|
log_print("Failed to set SCTP_EVENTS on socket: result=%d",
|
|
result);
|
|
goto create_delsock;
|
|
}
|
|
|
|
/* Init con struct */
|
|
sock->sk->sk_user_data = &sctp_con;
|
|
sctp_con.sock = sock;
|
|
sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready;
|
|
|
|
/* Bind to all interfaces. */
|
|
for (i = 0; i < dlm_local_count; i++) {
|
|
memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
|
|
make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
|
|
|
|
result = add_bind_addr(&localaddr, addr_len, num);
|
|
if (result)
|
|
goto create_delsock;
|
|
++num;
|
|
}
|
|
|
|
result = sock->ops->listen(sock, 5);
|
|
if (result < 0) {
|
|
log_print("Can't set socket listening");
|
|
goto create_delsock;
|
|
}
|
|
|
|
return 0;
|
|
|
|
create_delsock:
|
|
sock_release(sock);
|
|
sctp_con.sock = NULL;
|
|
out:
|
|
return result;
|
|
}
|
|
|
|
|
|
static struct writequeue_entry *new_writequeue_entry(gfp_t allocation)
|
|
{
|
|
struct writequeue_entry *entry;
|
|
|
|
entry = kmalloc(sizeof(struct writequeue_entry), allocation);
|
|
if (!entry)
|
|
return NULL;
|
|
|
|
entry->page = alloc_page(allocation);
|
|
if (!entry->page) {
|
|
kfree(entry);
|
|
return NULL;
|
|
}
|
|
|
|
entry->offset = 0;
|
|
entry->len = 0;
|
|
entry->end = 0;
|
|
entry->users = 0;
|
|
|
|
return entry;
|
|
}
|
|
|
|
void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
|
|
{
|
|
struct writequeue_entry *e;
|
|
int offset = 0;
|
|
int users = 0;
|
|
struct nodeinfo *ni;
|
|
|
|
ni = nodeid2nodeinfo(nodeid, allocation);
|
|
if (!ni)
|
|
return NULL;
|
|
|
|
spin_lock(&ni->writequeue_lock);
|
|
e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
|
|
if ((&e->list == &ni->writequeue) ||
|
|
(PAGE_CACHE_SIZE - e->end < len)) {
|
|
e = NULL;
|
|
} else {
|
|
offset = e->end;
|
|
e->end += len;
|
|
users = e->users++;
|
|
}
|
|
spin_unlock(&ni->writequeue_lock);
|
|
|
|
if (e) {
|
|
got_one:
|
|
if (users == 0)
|
|
kmap(e->page);
|
|
*ppc = page_address(e->page) + offset;
|
|
return e;
|
|
}
|
|
|
|
e = new_writequeue_entry(allocation);
|
|
if (e) {
|
|
spin_lock(&ni->writequeue_lock);
|
|
offset = e->end;
|
|
e->end += len;
|
|
e->ni = ni;
|
|
users = e->users++;
|
|
list_add_tail(&e->list, &ni->writequeue);
|
|
spin_unlock(&ni->writequeue_lock);
|
|
goto got_one;
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
void dlm_lowcomms_commit_buffer(void *arg)
|
|
{
|
|
struct writequeue_entry *e = (struct writequeue_entry *) arg;
|
|
int users;
|
|
struct nodeinfo *ni = e->ni;
|
|
|
|
spin_lock(&ni->writequeue_lock);
|
|
users = --e->users;
|
|
if (users)
|
|
goto out;
|
|
e->len = e->end - e->offset;
|
|
kunmap(e->page);
|
|
spin_unlock(&ni->writequeue_lock);
|
|
|
|
if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
|
|
spin_lock_bh(&write_nodes_lock);
|
|
list_add_tail(&ni->write_list, &write_nodes);
|
|
spin_unlock_bh(&write_nodes_lock);
|
|
|
|
queue_work(send_workqueue, &ni->swork);
|
|
}
|
|
return;
|
|
|
|
out:
|
|
spin_unlock(&ni->writequeue_lock);
|
|
return;
|
|
}
|
|
|
|
static void free_entry(struct writequeue_entry *e)
|
|
{
|
|
__free_page(e->page);
|
|
kfree(e);
|
|
}
|
|
|
|
/* Initiate an SCTP association. In theory we could just use sendmsg() on
|
|
the first IP address and it should work, but this allows us to set up the
|
|
association before sending any valuable data that we can't afford to lose.
|
|
It also keeps the send path clean as it can now always use the association ID */
|
|
static void initiate_association(int nodeid)
|
|
{
|
|
struct sockaddr_storage rem_addr;
|
|
static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
|
|
struct msghdr outmessage;
|
|
struct cmsghdr *cmsg;
|
|
struct sctp_sndrcvinfo *sinfo;
|
|
int ret;
|
|
int addrlen;
|
|
char buf[1];
|
|
struct kvec iov[1];
|
|
struct nodeinfo *ni;
|
|
|
|
log_print("Initiating association with node %d", nodeid);
|
|
|
|
ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
|
|
if (!ni)
|
|
return;
|
|
|
|
if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) {
|
|
log_print("no address for nodeid %d", nodeid);
|
|
return;
|
|
}
|
|
|
|
make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
|
|
|
|
outmessage.msg_name = &rem_addr;
|
|
outmessage.msg_namelen = addrlen;
|
|
outmessage.msg_control = outcmsg;
|
|
outmessage.msg_controllen = sizeof(outcmsg);
|
|
outmessage.msg_flags = MSG_EOR;
|
|
|
|
iov[0].iov_base = buf;
|
|
iov[0].iov_len = 1;
|
|
|
|
/* Real INIT messages seem to cause trouble. Just send a 1 byte message
|
|
we can afford to lose */
|
|
cmsg = CMSG_FIRSTHDR(&outmessage);
|
|
cmsg->cmsg_level = IPPROTO_SCTP;
|
|
cmsg->cmsg_type = SCTP_SNDRCV;
|
|
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
|
|
sinfo = CMSG_DATA(cmsg);
|
|
memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
|
|
sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
|
|
|
|
outmessage.msg_controllen = cmsg->cmsg_len;
|
|
ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1);
|
|
if (ret < 0) {
|
|
log_print("send INIT to node failed: %d", ret);
|
|
/* Try again later */
|
|
clear_bit(NI_INIT_PENDING, &ni->flags);
|
|
}
|
|
}
|
|
|
|
/* Send a message */
|
|
static void send_to_sock(struct nodeinfo *ni)
|
|
{
|
|
int ret = 0;
|
|
struct writequeue_entry *e;
|
|
int len, offset;
|
|
struct msghdr outmsg;
|
|
static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
|
|
struct cmsghdr *cmsg;
|
|
struct sctp_sndrcvinfo *sinfo;
|
|
struct kvec iov;
|
|
|
|
/* See if we need to init an association before we start
|
|
sending precious messages */
|
|
spin_lock(&ni->lock);
|
|
if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
|
|
spin_unlock(&ni->lock);
|
|
initiate_association(ni->nodeid);
|
|
return;
|
|
}
|
|
spin_unlock(&ni->lock);
|
|
|
|
outmsg.msg_name = NULL; /* We use assoc_id */
|
|
outmsg.msg_namelen = 0;
|
|
outmsg.msg_control = outcmsg;
|
|
outmsg.msg_controllen = sizeof(outcmsg);
|
|
outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR;
|
|
|
|
cmsg = CMSG_FIRSTHDR(&outmsg);
|
|
cmsg->cmsg_level = IPPROTO_SCTP;
|
|
cmsg->cmsg_type = SCTP_SNDRCV;
|
|
cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
|
|
sinfo = CMSG_DATA(cmsg);
|
|
memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
|
|
sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
|
|
sinfo->sinfo_assoc_id = ni->assoc_id;
|
|
outmsg.msg_controllen = cmsg->cmsg_len;
|
|
|
|
spin_lock(&ni->writequeue_lock);
|
|
for (;;) {
|
|
if (list_empty(&ni->writequeue))
|
|
break;
|
|
e = list_entry(ni->writequeue.next, struct writequeue_entry,
|
|
list);
|
|
len = e->len;
|
|
offset = e->offset;
|
|
BUG_ON(len == 0 && e->users == 0);
|
|
spin_unlock(&ni->writequeue_lock);
|
|
kmap(e->page);
|
|
|
|
ret = 0;
|
|
if (len) {
|
|
iov.iov_base = page_address(e->page)+offset;
|
|
iov.iov_len = len;
|
|
|
|
ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1,
|
|
len);
|
|
if (ret == -EAGAIN) {
|
|
sctp_con.eagain_flag = 1;
|
|
goto out;
|
|
} else if (ret < 0)
|
|
goto send_error;
|
|
} else {
|
|
/* Don't starve people filling buffers */
|
|
cond_resched();
|
|
}
|
|
|
|
spin_lock(&ni->writequeue_lock);
|
|
e->offset += ret;
|
|
e->len -= ret;
|
|
|
|
if (e->len == 0 && e->users == 0) {
|
|
list_del(&e->list);
|
|
kunmap(e->page);
|
|
free_entry(e);
|
|
continue;
|
|
}
|
|
}
|
|
spin_unlock(&ni->writequeue_lock);
|
|
out:
|
|
return;
|
|
|
|
send_error:
|
|
log_print("Error sending to node %d %d", ni->nodeid, ret);
|
|
spin_lock(&ni->lock);
|
|
if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
|
|
ni->assoc_id = 0;
|
|
spin_unlock(&ni->lock);
|
|
initiate_association(ni->nodeid);
|
|
} else
|
|
spin_unlock(&ni->lock);
|
|
|
|
return;
|
|
}
|
|
|
|
/* Try to send any messages that are pending */
|
|
static void process_output_queue(void)
|
|
{
|
|
struct list_head *list;
|
|
struct list_head *temp;
|
|
|
|
spin_lock_bh(&write_nodes_lock);
|
|
list_for_each_safe(list, temp, &write_nodes) {
|
|
struct nodeinfo *ni =
|
|
list_entry(list, struct nodeinfo, write_list);
|
|
clear_bit(NI_WRITE_PENDING, &ni->flags);
|
|
list_del(&ni->write_list);
|
|
|
|
spin_unlock_bh(&write_nodes_lock);
|
|
|
|
send_to_sock(ni);
|
|
spin_lock_bh(&write_nodes_lock);
|
|
}
|
|
spin_unlock_bh(&write_nodes_lock);
|
|
}
|
|
|
|
/* Called after we've had -EAGAIN and been woken up */
|
|
static void refill_write_queue(void)
|
|
{
|
|
int i;
|
|
|
|
for (i=1; i<=max_nodeid; i++) {
|
|
struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
|
|
|
|
if (ni) {
|
|
if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
|
|
spin_lock_bh(&write_nodes_lock);
|
|
list_add_tail(&ni->write_list, &write_nodes);
|
|
spin_unlock_bh(&write_nodes_lock);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
static void clean_one_writequeue(struct nodeinfo *ni)
|
|
{
|
|
struct list_head *list;
|
|
struct list_head *temp;
|
|
|
|
spin_lock(&ni->writequeue_lock);
|
|
list_for_each_safe(list, temp, &ni->writequeue) {
|
|
struct writequeue_entry *e =
|
|
list_entry(list, struct writequeue_entry, list);
|
|
list_del(&e->list);
|
|
free_entry(e);
|
|
}
|
|
spin_unlock(&ni->writequeue_lock);
|
|
}
|
|
|
|
static void clean_writequeues(void)
|
|
{
|
|
int i;
|
|
|
|
for (i=1; i<=max_nodeid; i++) {
|
|
struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
|
|
if (ni)
|
|
clean_one_writequeue(ni);
|
|
}
|
|
}
|
|
|
|
|
|
static void dealloc_nodeinfo(void)
|
|
{
|
|
int i;
|
|
|
|
for (i=1; i<=max_nodeid; i++) {
|
|
struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
|
|
if (ni) {
|
|
idr_remove(&nodeinfo_idr, i);
|
|
kfree(ni);
|
|
}
|
|
}
|
|
}
|
|
|
|
int dlm_lowcomms_close(int nodeid)
|
|
{
|
|
struct nodeinfo *ni;
|
|
|
|
ni = nodeid2nodeinfo(nodeid, 0);
|
|
if (!ni)
|
|
return -1;
|
|
|
|
spin_lock(&ni->lock);
|
|
if (ni->assoc_id) {
|
|
ni->assoc_id = 0;
|
|
/* Don't send shutdown here, sctp will just queue it
|
|
till the node comes back up! */
|
|
}
|
|
spin_unlock(&ni->lock);
|
|
|
|
clean_one_writequeue(ni);
|
|
clear_bit(NI_INIT_PENDING, &ni->flags);
|
|
return 0;
|
|
}
|
|
|
|
// PJC: The work queue function for receiving.
|
|
static void process_recv_sockets(struct work_struct *work)
|
|
{
|
|
if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
|
|
int ret;
|
|
int count = 0;
|
|
|
|
do {
|
|
ret = receive_from_sock();
|
|
|
|
/* Don't starve out everyone else */
|
|
if (++count >= MAX_RX_MSG_COUNT) {
|
|
cond_resched();
|
|
count = 0;
|
|
}
|
|
} while (!kthread_should_stop() && ret >=0);
|
|
}
|
|
cond_resched();
|
|
}
|
|
|
|
// PJC: the work queue function for sending
|
|
static void process_send_sockets(struct work_struct *work)
|
|
{
|
|
if (sctp_con.eagain_flag) {
|
|
sctp_con.eagain_flag = 0;
|
|
refill_write_queue();
|
|
}
|
|
process_output_queue();
|
|
}
|
|
|
|
// PJC: Process lock requests from a particular node.
|
|
// TODO: can we optimise this out on UP ??
|
|
static void process_lock_request(struct work_struct *work)
|
|
{
|
|
}
|
|
|
|
static void daemons_stop(void)
|
|
{
|
|
destroy_workqueue(recv_workqueue);
|
|
destroy_workqueue(send_workqueue);
|
|
destroy_workqueue(lock_workqueue);
|
|
}
|
|
|
|
static int daemons_start(void)
|
|
{
|
|
int error;
|
|
recv_workqueue = create_workqueue("dlm_recv");
|
|
error = IS_ERR(recv_workqueue);
|
|
if (error) {
|
|
log_print("can't start dlm_recv %d", error);
|
|
return error;
|
|
}
|
|
|
|
send_workqueue = create_singlethread_workqueue("dlm_send");
|
|
error = IS_ERR(send_workqueue);
|
|
if (error) {
|
|
log_print("can't start dlm_send %d", error);
|
|
destroy_workqueue(recv_workqueue);
|
|
return error;
|
|
}
|
|
|
|
lock_workqueue = create_workqueue("dlm_rlock");
|
|
error = IS_ERR(lock_workqueue);
|
|
if (error) {
|
|
log_print("can't start dlm_rlock %d", error);
|
|
destroy_workqueue(send_workqueue);
|
|
destroy_workqueue(recv_workqueue);
|
|
return error;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* This is quite likely to sleep...
|
|
*/
|
|
int dlm_lowcomms_start(void)
|
|
{
|
|
int error;
|
|
|
|
INIT_WORK(&sctp_con.work, process_recv_sockets);
|
|
|
|
error = init_sock();
|
|
if (error)
|
|
goto fail_sock;
|
|
error = daemons_start();
|
|
if (error)
|
|
goto fail_sock;
|
|
return 0;
|
|
|
|
fail_sock:
|
|
close_connection();
|
|
return error;
|
|
}
|
|
|
|
void dlm_lowcomms_stop(void)
|
|
{
|
|
int i;
|
|
|
|
sctp_con.flags = 0x7;
|
|
daemons_stop();
|
|
clean_writequeues();
|
|
close_connection();
|
|
dealloc_nodeinfo();
|
|
max_nodeid = 0;
|
|
|
|
dlm_local_count = 0;
|
|
dlm_local_nodeid = 0;
|
|
|
|
for (i = 0; i < dlm_local_count; i++)
|
|
kfree(dlm_local_addr[i]);
|
|
}
|