tipc: change sk_buffer handling in tipc_link_xmit()

When the function tipc_link_xmit() is given a buffer list for
transmission, it currently consumes the list both when transmission
is successful and when it fails, except for the special case when
it encounters link congestion.

This behavior is inconsistent, and needs to be corrected if we want
to avoid problems in later commits in this series.

In this commit, we change this to let the function consume the list
only when transmission is successful, and leave the list with the
sender in all other cases. We also modifiy the socket code so that
it adapts to this change, i.e., purges the list when a non-congestion
error code is returned.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
Jon Paul Maloy 2015-07-16 16:54:23 -04:00 committed by David S. Miller
parent 36e78a463b
commit 22d85c7942
3 changed files with 37 additions and 40 deletions

View File

@ -358,10 +358,9 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
/* Prepare clone of message for local node */
skb = tipc_msg_reassemble(list);
if (unlikely(!skb)) {
__skb_queue_purge(list);
if (unlikely(!skb))
return -EHOSTUNREACH;
}
/* Broadcast to all nodes */
if (likely(bclink)) {
tipc_bclink_lock(net);

View File

@ -340,7 +340,7 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
* @link: congested link
* @list: message that was attempted sent
* Create pseudo msg to send back to user when congestion abates
* Only consumes message if there is an error
* Does not consume buffer list
*/
static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
{
@ -354,7 +354,7 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
tipc_link_reset(link);
goto err;
return -ENOBUFS;
}
/* Non-blocking sender: */
if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
@ -364,15 +364,12 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
addr, addr, oport, 0, 0);
if (!skb)
goto err;
return -ENOBUFS;
TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
TIPC_SKB_CB(skb)->chain_imp = imp;
skb_queue_tail(&link->wakeupq, skb);
link->stats.link_congs++;
return -ELINKCONG;
err:
__skb_queue_purge(list);
return -ENOBUFS;
}
/**
@ -641,8 +638,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
* @link: link to use
* @list: chain of buffers containing message
*
* Consumes the buffer chain, except when returning -ELINKCONG,
* since the caller then may want to make more send attempts.
* Consumes the buffer chain, except when returning an error code,
* Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
* Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
*/
@ -666,10 +662,9 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
if (unlikely(link->backlog[i].len >= link->backlog[i].limit))
return link_schedule_user(link, list);
}
if (unlikely(msg_size(msg) > mtu)) {
__skb_queue_purge(list);
if (unlikely(msg_size(msg) > mtu))
return -EMSGSIZE;
}
/* Prepare each packet for sending, and add to relevant queue: */
while (skb_queue_len(list)) {
skb = skb_peek(list);
@ -722,7 +717,7 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
/* tipc_link_xmit_skb(): send single buffer to destination
* Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
* messages, which will not be rejected
* messages, which will not cause link congestion
* The only exception is datagram messages rerouted after secondary
* lookup, which are rare and safe to dispose of anyway.
* TODO: Return real return value, and let callers use
@ -736,7 +731,7 @@ int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
skb2list(skb, &head);
rc = tipc_link_xmit(net, &head, dnode, selector);
if (rc == -ELINKCONG)
if (rc)
kfree_skb(skb);
return 0;
}
@ -748,7 +743,7 @@ int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
* @dsz: amount of user data to be sent
* @dnode: address of destination node
* @selector: a number used for deterministic link selection
* Consumes the buffer chain, except when returning -ELINKCONG
* Consumes the buffer chain, except when returning error
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,

View File

@ -686,21 +686,22 @@ new_mtu:
do {
rc = tipc_bclink_xmit(net, pktchain);
if (likely(rc >= 0)) {
rc = dsz;
break;
if (likely(!rc))
return dsz;
if (rc == -ELINKCONG) {
tsk->link_cong = 1;
rc = tipc_wait_for_sndmsg(sock, &timeo);
if (!rc)
continue;
}
__skb_queue_purge(pktchain);
if (rc == -EMSGSIZE) {
msg->msg_iter = save;
goto new_mtu;
}
if (rc != -ELINKCONG)
break;
tipc_sk(sk)->link_cong = 1;
rc = tipc_wait_for_sndmsg(sock, &timeo);
if (rc)
__skb_queue_purge(pktchain);
} while (!rc);
break;
} while (1);
return rc;
}
@ -925,23 +926,24 @@ new_mtu:
skb = skb_peek(pktchain);
TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
if (likely(rc >= 0)) {
if (likely(!rc)) {
if (sock->state != SS_READY)
sock->state = SS_CONNECTING;
rc = dsz;
break;
return dsz;
}
if (rc == -ELINKCONG) {
tsk->link_cong = 1;
rc = tipc_wait_for_sndmsg(sock, &timeo);
if (!rc)
continue;
}
__skb_queue_purge(pktchain);
if (rc == -EMSGSIZE) {
m->msg_iter = save;
goto new_mtu;
}
if (rc != -ELINKCONG)
break;
tsk->link_cong = 1;
rc = tipc_wait_for_sndmsg(sock, &timeo);
if (rc)
__skb_queue_purge(pktchain);
} while (!rc);
break;
} while (1);
return rc;
}
@ -1048,10 +1050,11 @@ next:
tsk->sent_unacked++;
sent += send;
if (sent == dsz)
break;
return dsz;
goto next;
}
if (rc == -EMSGSIZE) {
__skb_queue_purge(pktchain);
tsk->max_pkt = tipc_node_get_mtu(net, dnode,
portid);
m->msg_iter = save;
@ -1059,13 +1062,13 @@ next:
}
if (rc != -ELINKCONG)
break;
tsk->link_cong = 1;
}
rc = tipc_wait_for_sndpkt(sock, &timeo);
if (rc)
__skb_queue_purge(pktchain);
} while (!rc);
__skb_queue_purge(pktchain);
return sent ? sent : rc;
}