Merge branch 'jk/fsmonitor-event-listener-race-fix' into maint-2.47

On macOS, fsmonitor can fall into a race condition that results in
a client waiting forever to be notified for an event that have
already happened.  This problem has been corrected.

* jk/fsmonitor-event-listener-race-fix:
  fsmonitor: initialize fs event listener before accepting clients
  simple-ipc: split async server initialization and running
This commit is contained in:
Junio C Hamano 2024-11-20 14:42:57 +09:00
commit f1a50f12b9
7 changed files with 98 additions and 18 deletions

View File

@ -1208,9 +1208,9 @@ static int fsmonitor_run_daemon_1(struct fsmonitor_daemon_state *state)
* system event listener thread so that we have the IPC handle * system event listener thread so that we have the IPC handle
* before we need it. * before we need it.
*/ */
if (ipc_server_run_async(&state->ipc_server_data, if (ipc_server_init_async(&state->ipc_server_data,
state->path_ipc.buf, &ipc_opts, state->path_ipc.buf, &ipc_opts,
handle_client, state)) handle_client, state))
return error_errno( return error_errno(
_("could not start IPC thread pool on '%s'"), _("could not start IPC thread pool on '%s'"),
state->path_ipc.buf); state->path_ipc.buf);

View File

@ -516,6 +516,12 @@ void fsm_listen__loop(struct fsmonitor_daemon_state *state)
} }
data->stream_started = 1; data->stream_started = 1;
/*
* Our fs event listener is now running, so it's safe to start
* serving client requests.
*/
ipc_server_start_async(state->ipc_server_data);
pthread_mutex_lock(&data->dq_lock); pthread_mutex_lock(&data->dq_lock);
pthread_cond_wait(&data->dq_finished, &data->dq_lock); pthread_cond_wait(&data->dq_finished, &data->dq_lock);
pthread_mutex_unlock(&data->dq_lock); pthread_mutex_unlock(&data->dq_lock);

View File

@ -741,6 +741,12 @@ void fsm_listen__loop(struct fsmonitor_daemon_state *state)
start_rdcw_watch(data->watch_gitdir) == -1) start_rdcw_watch(data->watch_gitdir) == -1)
goto force_error_stop; goto force_error_stop;
/*
* Now that we've established the rdcw watches, we can start
* serving clients.
*/
ipc_server_start_async(state->ipc_server_data);
for (;;) { for (;;) {
dwWait = WaitForMultipleObjects(data->nr_listener_handles, dwWait = WaitForMultipleObjects(data->nr_listener_handles,
data->hListener, data->hListener,

View File

@ -16,11 +16,12 @@ int ipc_server_run(const char *path, const struct ipc_server_opts *opts,
struct ipc_server_data *server_data = NULL; struct ipc_server_data *server_data = NULL;
int ret; int ret;
ret = ipc_server_run_async(&server_data, path, opts, ret = ipc_server_init_async(&server_data, path, opts,
application_cb, application_data); application_cb, application_data);
if (ret) if (ret)
return ret; return ret;
ipc_server_start_async(server_data);
ret = ipc_server_await(server_data); ret = ipc_server_await(server_data);
ipc_server_free(server_data); ipc_server_free(server_data);

View File

@ -328,6 +328,7 @@ struct ipc_server_data {
int back_pos; int back_pos;
int front_pos; int front_pos;
int started;
int shutdown_requested; int shutdown_requested;
int is_stopped; int is_stopped;
}; };
@ -824,10 +825,10 @@ static int setup_listener_socket(
/* /*
* Start IPC server in a pool of background threads. * Start IPC server in a pool of background threads.
*/ */
int ipc_server_run_async(struct ipc_server_data **returned_server_data, int ipc_server_init_async(struct ipc_server_data **returned_server_data,
const char *path, const struct ipc_server_opts *opts, const char *path, const struct ipc_server_opts *opts,
ipc_server_application_cb *application_cb, ipc_server_application_cb *application_cb,
void *application_data) void *application_data)
{ {
struct unix_ss_socket *server_socket = NULL; struct unix_ss_socket *server_socket = NULL;
struct ipc_server_data *server_data; struct ipc_server_data *server_data;
@ -888,6 +889,12 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data,
server_data->accept_thread->fd_send_shutdown = sv[0]; server_data->accept_thread->fd_send_shutdown = sv[0];
server_data->accept_thread->fd_wait_shutdown = sv[1]; server_data->accept_thread->fd_wait_shutdown = sv[1];
/*
* Hold work-available mutex so that no work can start until
* we unlock it.
*/
pthread_mutex_lock(&server_data->work_available_mutex);
if (pthread_create(&server_data->accept_thread->pthread_id, NULL, if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
accept_thread_proc, server_data->accept_thread)) accept_thread_proc, server_data->accept_thread))
die_errno(_("could not start accept_thread '%s'"), path); die_errno(_("could not start accept_thread '%s'"), path);
@ -918,6 +925,15 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data,
return 0; return 0;
} }
void ipc_server_start_async(struct ipc_server_data *server_data)
{
if (!server_data || server_data->started)
return;
server_data->started = 1;
pthread_mutex_unlock(&server_data->work_available_mutex);
}
/* /*
* Gently tell the IPC server treads to shutdown. * Gently tell the IPC server treads to shutdown.
* Can be run on any thread. * Can be run on any thread.
@ -933,7 +949,9 @@ int ipc_server_stop_async(struct ipc_server_data *server_data)
trace2_region_enter("ipc-server", "server-stop-async", NULL); trace2_region_enter("ipc-server", "server-stop-async", NULL);
pthread_mutex_lock(&server_data->work_available_mutex); /* If we haven't started yet, we are already holding lock. */
if (server_data->started)
pthread_mutex_lock(&server_data->work_available_mutex);
server_data->shutdown_requested = 1; server_data->shutdown_requested = 1;

View File

@ -371,6 +371,9 @@ struct ipc_server_data {
HANDLE hEventStopRequested; HANDLE hEventStopRequested;
struct ipc_server_thread_data *thread_list; struct ipc_server_thread_data *thread_list;
int is_stopped; int is_stopped;
pthread_mutex_t startup_barrier;
int started;
}; };
enum connect_result { enum connect_result {
@ -526,6 +529,16 @@ static int use_connection(struct ipc_server_thread_data *server_thread_data)
return ret; return ret;
} }
static void wait_for_startup_barrier(struct ipc_server_data *server_data)
{
/*
* Temporarily hold the startup_barrier mutex before starting,
* which lets us know that it's OK to start serving requests.
*/
pthread_mutex_lock(&server_data->startup_barrier);
pthread_mutex_unlock(&server_data->startup_barrier);
}
/* /*
* Thread proc for an IPC server worker thread. It handles a series of * Thread proc for an IPC server worker thread. It handles a series of
* connections from clients. It cleans and reuses the hPipe between each * connections from clients. It cleans and reuses the hPipe between each
@ -550,6 +563,8 @@ static void *server_thread_proc(void *_server_thread_data)
memset(&oConnect, 0, sizeof(oConnect)); memset(&oConnect, 0, sizeof(oConnect));
oConnect.hEvent = hEventConnected; oConnect.hEvent = hEventConnected;
wait_for_startup_barrier(server_thread_data->server_data);
for (;;) { for (;;) {
cr = wait_for_connection(server_thread_data, &oConnect); cr = wait_for_connection(server_thread_data, &oConnect);
@ -752,10 +767,10 @@ static HANDLE create_new_pipe(wchar_t *wpath, int is_first)
return hPipe; return hPipe;
} }
int ipc_server_run_async(struct ipc_server_data **returned_server_data, int ipc_server_init_async(struct ipc_server_data **returned_server_data,
const char *path, const struct ipc_server_opts *opts, const char *path, const struct ipc_server_opts *opts,
ipc_server_application_cb *application_cb, ipc_server_application_cb *application_cb,
void *application_data) void *application_data)
{ {
struct ipc_server_data *server_data; struct ipc_server_data *server_data;
wchar_t wpath[MAX_PATH]; wchar_t wpath[MAX_PATH];
@ -787,6 +802,13 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data,
strbuf_addstr(&server_data->buf_path, path); strbuf_addstr(&server_data->buf_path, path);
wcscpy(server_data->wpath, wpath); wcscpy(server_data->wpath, wpath);
/*
* Hold the startup_barrier lock so that no threads will progress
* until ipc_server_start_async() is called.
*/
pthread_mutex_init(&server_data->startup_barrier, NULL);
pthread_mutex_lock(&server_data->startup_barrier);
if (nr_threads < 1) if (nr_threads < 1)
nr_threads = 1; nr_threads = 1;
@ -837,6 +859,15 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data,
return 0; return 0;
} }
void ipc_server_start_async(struct ipc_server_data *server_data)
{
if (!server_data || server_data->started)
return;
server_data->started = 1;
pthread_mutex_unlock(&server_data->startup_barrier);
}
int ipc_server_stop_async(struct ipc_server_data *server_data) int ipc_server_stop_async(struct ipc_server_data *server_data)
{ {
if (!server_data) if (!server_data)
@ -850,6 +881,13 @@ int ipc_server_stop_async(struct ipc_server_data *server_data)
* We DO NOT attempt to force them to drop an active connection. * We DO NOT attempt to force them to drop an active connection.
*/ */
SetEvent(server_data->hEventStopRequested); SetEvent(server_data->hEventStopRequested);
/*
* If we haven't yet told the threads they are allowed to run,
* do so now, so they can receive the shutdown event.
*/
ipc_server_start_async(server_data);
return 0; return 0;
} }
@ -900,5 +938,7 @@ void ipc_server_free(struct ipc_server_data *server_data)
free(std); free(std);
} }
pthread_mutex_destroy(&server_data->startup_barrier);
free(server_data); free(server_data);
} }

View File

@ -179,11 +179,20 @@ struct ipc_server_opts
* When a client IPC message is received, the `application_cb` will be * When a client IPC message is received, the `application_cb` will be
* called (possibly on a random thread) to handle the message and * called (possibly on a random thread) to handle the message and
* optionally compose a reply message. * optionally compose a reply message.
*
* This initializes all threads but no actual work will be done until
* ipc_server_start_async() is called.
*/ */
int ipc_server_run_async(struct ipc_server_data **returned_server_data, int ipc_server_init_async(struct ipc_server_data **returned_server_data,
const char *path, const struct ipc_server_opts *opts, const char *path, const struct ipc_server_opts *opts,
ipc_server_application_cb *application_cb, ipc_server_application_cb *application_cb,
void *application_data); void *application_data);
/*
* Let an async server start running. This needs to be called only once
* after initialization.
*/
void ipc_server_start_async(struct ipc_server_data *server_data);
/* /*
* Gently signal the IPC server pool to shutdown. No new client * Gently signal the IPC server pool to shutdown. No new client