path: root/net/rxrpc/call_object.c
diff options
authorDavid Howells <dhowells@redhat.com>2017-02-27 15:43:06 +0000
committerDavid S. Miller <davem@davemloft.net>2017-03-01 09:50:58 -0800
commit540b1c48c37ac0ad66212004db21e1ff7e2d78be (patch)
tree79fa02adc6864044347f0de00107397e455c8942 /net/rxrpc/call_object.c
parent2d6be4abf514fc26c83d239c7f31da1f95e4a31d (diff)
rxrpc: Fix deadlock between call creation and sendmsg/recvmsg
All the routines by which rxrpc is accessed from the outside are serialised by means of the socket lock (sendmsg, recvmsg, bind, rxrpc_kernel_begin_call(), ...) and this presents a problem: (1) If a number of calls on the same socket are in the process of connection to the same peer, a maximum of four concurrent live calls are permitted before further calls need to wait for a slot. (2) If a call is waiting for a slot, it is deep inside sendmsg() or rxrpc_kernel_begin_call() and the entry function is holding the socket lock. (3) sendmsg() and recvmsg() or the in-kernel equivalents are prevented from servicing the other calls as they need to take the socket lock to do so. (4) The socket is stuck until a call is aborted and makes its slot available to the waiter. Fix this by: (1) Provide each call with a mutex ('user_mutex') that arbitrates access by the users of rxrpc separately for each specific call. (2) Make rxrpc_sendmsg() and rxrpc_recvmsg() unlock the socket as soon as they've got a call and taken its mutex. Note that I'm returning EWOULDBLOCK from recvmsg() if MSG_DONTWAIT is set but someone else has the lock. Should I instead only return EWOULDBLOCK if there's nothing currently to be done on a socket, and sleep in this particular instance because there is something to be done, but we appear to be blocked by the interrupt handler doing its ping? (3) Make rxrpc_new_client_call() unlock the socket after allocating a new call, locking its user mutex and adding it to the socket's call tree. The call is returned locked so that sendmsg() can add data to it immediately. From the moment the call is in the socket tree, it is subject to access by sendmsg() and recvmsg() - even if it isn't connected yet. (4) Lock new service calls in the UDP data_ready handler (in rxrpc_new_incoming_call()) because they may already be in the socket's tree and the data_ready handler makes them live immediately if a user ID has already been preassigned. Note that the new call is locked before any notifications are sent that it is live, so doing mutex_trylock() *ought* to always succeed. Userspace is prevented from doing sendmsg() on calls that are in a too-early state in rxrpc_do_sendmsg(). (5) Make rxrpc_new_incoming_call() return the call with the user mutex held so that a ping can be scheduled immediately under it. Note that it might be worth moving the ping call into rxrpc_new_incoming_call() and then we can drop the mutex there. (6) Make rxrpc_accept_call() take the lock on the call it is accepting and release the socket after adding the call to the socket's tree. This is slightly tricky as we've dequeued the call by that point and have to requeue it. Note that requeuing emits a trace event. (7) Make rxrpc_kernel_send_data() and rxrpc_kernel_recv_data() take the new mutex immediately and don't bother with the socket mutex at all. This patch has the nice bonus that calls on the same socket are now to some extent parallelisable. Note that we might want to move rxrpc_service_prealloc() calls out from the socket lock and give it its own lock, so that we don't hang progress in other calls because we're waiting for the allocator. We probably also want to avoid calling rxrpc_notify_socket() from within the socket lock (rxrpc_accept_call()). Signed-off-by: David Howells <dhowells@redhat.com> Tested-by: Marc Dionne <marc.c.dionne@auristor.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/rxrpc/call_object.c')
1 files changed, 16 insertions, 2 deletions
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 8b94db3c9b2e..d79cd36987a9 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -115,6 +115,7 @@ struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
if (!call->rxtx_annotations)
goto nomem_2;
+ mutex_init(&call->user_mutex);
setup_timer(&call->timer, rxrpc_call_timer_expired,
(unsigned long)call);
INIT_WORK(&call->processor, &rxrpc_process_call);
@@ -194,14 +195,16 @@ static void rxrpc_start_call_timer(struct rxrpc_call *call)
- * set up a call for the given data
- * - called in process context with IRQs enabled
+ * Set up a call for the given parameters.
+ * - Called with the socket lock held, which it must release.
+ * - If it returns a call, the call's lock will need releasing by the caller.
struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
struct rxrpc_conn_parameters *cp,
struct sockaddr_rxrpc *srx,
unsigned long user_call_ID,
gfp_t gfp)
+ __releases(&rx->sk.sk_lock.slock)
struct rxrpc_call *call, *xcall;
struct rb_node *parent, **pp;
@@ -212,6 +215,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
call = rxrpc_alloc_client_call(srx, gfp);
if (IS_ERR(call)) {
+ release_sock(&rx->sk);
_leave(" = %ld", PTR_ERR(call));
return call;
@@ -219,6 +223,11 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
trace_rxrpc_call(call, rxrpc_call_new_client, atomic_read(&call->usage),
here, (const void *)user_call_ID);
+ /* We need to protect a partially set up call against the user as we
+ * will be acting outside the socket lock.
+ */
+ mutex_lock(&call->user_mutex);
/* Publish the call, even though it is incompletely set up as yet */
@@ -250,6 +259,9 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
list_add_tail(&call->link, &rxrpc_calls);
+ /* From this point on, the call is protected by its own lock. */
+ release_sock(&rx->sk);
/* Set up or get a connection record and set the protocol parameters,
* including channel number and call ID.
@@ -279,6 +291,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
+ release_sock(&rx->sk);
ret = -EEXIST;
@@ -287,6 +300,7 @@ error:
trace_rxrpc_call(call, rxrpc_call_error, atomic_read(&call->usage),
here, ERR_PTR(ret));
rxrpc_release_call(rx, call);
+ mutex_unlock(&call->user_mutex);
rxrpc_put_call(call, rxrpc_call_put);
_leave(" = %d", ret);
return ERR_PTR(ret);