libgm2/libm2iso/RTco.cc (re-implementation) Bugfix for [PR108835]

This is a re-implementation of RTco.cc which fixes the race hazzard
seen occasionally when running testtransfer and coroutines from the
modula2 testsuite.

libgm2/ChangeLog:

	PR testsuite/108835
	* libm2iso/RTco.cc: Re-implementation using a single lock
	mutex and inlined wait/signal implementation within
	transfer.

Signed-off-by: Gaius Mulley <gaiusmod2@gmail.com>
This commit is contained in:
Gaius Mulley 2023-02-19 22:08:31 +00:00
parent 0263e9d5d8
commit b9c83e7789

View File

@ -1,4 +1,4 @@
/* RTco.c provides minimal access to thread primitives.
/* RTco.cc provides minimal access to thread primitives.
Copyright (C) 2019-2022 Free Software Foundation, Inc.
Contributed by Gaius Mulley <gaius.mulley@southwales.ac.uk>.
@ -30,8 +30,19 @@ see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
#include <sys/select.h>
#include <stdlib.h>
#include <m2rts.h>
#include <cstdio>
// #define TRACEON
#define EXPORT(FUNC) RTco_ ## FUNC
#define M2EXPORT(FUNC) _M2_RTco_ ## FUNC
/* This implementation of RTco.cc uses a single lock for mutex across
the whole module. It also forces context switching between threads
in transfer by combining an implementation of wait and signal.
All semaphores are implemented using the same mutex lock and
separate condition variables. */
#undef TRACEON
#define POOL
#define SEM_POOL 10000
@ -63,19 +74,22 @@ see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
#define tprintf(...)
#endif
typedef struct threadCB_s
{
void (*proc) (void);
int execution;
pthread_t p;
int tid;
int tid; /* The thread id. */
unsigned int interruptLevel;
__gthread_cond_t run_counter; /* Used to block the thread and force
a context switch. */
int value; /* Count 0 or 1. */
bool waiting; /* Is this thread waiting on the run_counter? */
} threadCB;
typedef struct threadSem_s
{
__gthread_mutex_t mutex;
__gthread_cond_t counter;
int waiting;
int sem_value;
@ -87,25 +101,27 @@ static unsigned int nSemaphores = 0;
static threadSem **semArray = NULL;
/* These are used to lock the above module data structures. */
static threadSem lock;
static __gthread_mutex_t lock; /* This is the only mutex for
the whole module. */
static int initialized = FALSE;
static int currentThread = 0;
extern "C" int RTco_init (void);
extern "C" int EXPORT(init) (void);
extern "C" void
_M2_RTco_dep (void)
M2EXPORT(dep) (void)
{
}
extern "C" void
_M2_RTco_init (int argc, char *argv[], char *envp[])
M2EXPORT(init) (int argc, char *argv[], char *envp[])
{
}
extern "C" void
_M2_RTco_fini (int argc, char *argv[], char *envp[])
M2EXPORT(fini) (int argc, char *argv[], char *envp[])
{
}
@ -114,7 +130,6 @@ static void
initSem (threadSem *sem, int value)
{
__GTHREAD_COND_INIT_FUNCTION (&sem->counter);
__GTHREAD_MUTEX_INIT_FUNCTION (&sem->mutex);
sem->waiting = FALSE;
sem->sem_value = value;
}
@ -122,43 +137,41 @@ initSem (threadSem *sem, int value)
static void
waitSem (threadSem *sem)
{
__gthread_mutex_lock (&sem->mutex);
__gthread_mutex_lock (&lock);
if (sem->sem_value == 0)
{
sem->waiting = TRUE;
__gthread_cond_wait (&sem->counter, &sem->mutex);
__gthread_cond_wait (&sem->counter, &lock);
sem->waiting = FALSE;
}
else
sem->sem_value--;
__gthread_mutex_unlock (&sem->mutex);
__gthread_mutex_unlock (&lock);
}
static void
signalSem (threadSem *sem)
{
__gthread_mutex_unlock (&sem->mutex);
__gthread_mutex_lock (&lock);
if (sem->waiting)
__gthread_cond_signal (&sem->counter);
else
sem->sem_value++;
__gthread_mutex_unlock (&sem->mutex);
__gthread_mutex_unlock (&lock);
}
void stop (void) {}
extern "C" void
RTco_wait (int sid)
EXPORT(wait) (int sid)
{
RTco_init ();
EXPORT(init) ();
tprintf ("wait %d\n", sid);
waitSem (semArray[sid]);
}
extern "C" void
RTco_signal (int sid)
EXPORT(signal) (int sid)
{
RTco_init ();
EXPORT(init) ();
tprintf ("signal %d\n", sid);
signalSem (semArray[sid]);
}
@ -207,90 +220,58 @@ initSemaphore (int value)
}
extern "C" int
RTco_initSemaphore (int value)
EXPORT(initSemaphore) (int value)
{
int sid;
RTco_init ();
waitSem (&lock);
tprintf ("initSemaphore (%d) called\n", value);
EXPORT(init) ();
tprintf ("about to access lock\n");
__gthread_mutex_lock (&lock);
sid = initSemaphore (value);
signalSem (&lock);
__gthread_mutex_unlock (&lock);
return sid;
}
/* signalThread signal the semaphore associated with thread tid. */
extern "C" void
RTco_signalThread (int tid)
{
int sem;
RTco_init ();
tprintf ("signalThread %d\n", tid);
waitSem (&lock);
sem = threadArray[tid].execution;
signalSem (&lock);
RTco_signal (sem);
}
/* waitThread wait on the semaphore associated with thread tid. */
extern "C" void
RTco_waitThread (int tid)
{
RTco_init ();
tprintf ("waitThread %d\n", tid);
RTco_wait (threadArray[tid].execution);
}
extern "C" int
currentThread (void)
EXPORT(currentThread) (void)
{
int tid;
for (tid = 0; tid < nThreads; tid++)
if (pthread_self () == threadArray[tid].p)
return tid;
M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
"failed to find currentThread");
}
extern "C" int
RTco_currentThread (void)
{
int tid;
RTco_init ();
waitSem (&lock);
tid = currentThread ();
EXPORT(init) ();
__gthread_mutex_lock (&lock);
tid = currentThread;
tprintf ("currentThread %d\n", tid);
signalSem (&lock);
__gthread_mutex_unlock (&lock);
return tid;
}
/* currentInterruptLevel returns the interrupt level of the current thread. */
extern "C" unsigned int
RTco_currentInterruptLevel (void)
EXPORT(currentInterruptLevel) (void)
{
RTco_init ();
EXPORT(init) ();
__gthread_mutex_lock (&lock);
tprintf ("currentInterruptLevel %d\n",
threadArray[RTco_currentThread ()].interruptLevel);
return threadArray[RTco_currentThread ()].interruptLevel;
threadArray[currentThread].interruptLevel);
int level = threadArray[currentThread].interruptLevel;
__gthread_mutex_unlock (&lock);
return level;
}
/* turninterrupts returns the old interrupt level and assigns the
interrupt level to newLevel. */
extern "C" unsigned int
RTco_turnInterrupts (unsigned int newLevel)
EXPORT(turnInterrupts) (unsigned int newLevel)
{
int tid = RTco_currentThread ();
unsigned int old = RTco_currentInterruptLevel ();
EXPORT(init) ();
__gthread_mutex_lock (&lock);
unsigned int old = threadArray[currentThread].interruptLevel;
tprintf ("turnInterrupts from %d to %d\n", old, newLevel);
waitSem (&lock);
threadArray[tid].interruptLevel = newLevel;
signalSem (&lock);
threadArray[currentThread].interruptLevel = newLevel;
__gthread_mutex_unlock (&lock);
return old;
}
@ -306,12 +287,30 @@ execThread (void *t)
{
threadCB *tp = (threadCB *)t;
tprintf ("exec thread tid = %d coming to life\n", tp->tid);
__gthread_mutex_lock (&lock);
tprintf ("exec thread tid = %d function = 0x%p arg = 0x%p\n", tp->tid,
tp->proc, t);
RTco_waitThread (
tp->tid); /* Forcing this thread to block, waiting to be scheduled. */
tprintf (" exec thread [%d] function = 0x%p arg = 0x%p\n", tp->tid,
/* Has the thread been signalled? */
if (tp->value == 0)
{
/* Not been signalled therefore we force ourselves to block. */
tprintf ("%s: forcing thread tid = %d to wait\n",
__FUNCTION__, tp->tid);
tp->waiting = true; /* We are waiting. */
__gthread_cond_wait (&tp->run_counter, &lock);
tp->waiting = false; /* Running again. */
}
else
{
/* Yes signalled, therefore just take the recorded signal and continue. */
tprintf ("%s: no need for thread tid = %d to wait\n",
__FUNCTION__, tp->tid);
tp->value--;
}
tprintf (" running exec thread [%d] function = 0x%p arg = 0x%p\n", tp->tid,
tp->proc, t);
__gthread_mutex_unlock (&lock);
tp->proc (); /* Now execute user procedure. */
#if 0
M2RTS_CoroutineException ( __FILE__, __LINE__, __COLUMN__, __FUNCTION__, "coroutine finishing");
@ -356,21 +355,24 @@ initThread (void (*proc) (void), unsigned int stackSize,
threadArray[tid].proc = proc;
threadArray[tid].tid = tid;
threadArray[tid].execution = initSemaphore (0);
/* Initialize the thread run_counter used to force a context switch. */
__GTHREAD_COND_INIT_FUNCTION (&threadArray[tid].run_counter);
threadArray[tid].interruptLevel = interrupt;
threadArray[tid].waiting = false; /* The thread is running. */
threadArray[tid].value = 0; /* No signal has been seen yet. */
/* set thread creation attributes. */
/* Set thread creation attributes. */
result = pthread_attr_init (&attr);
if (result != 0)
M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
"failed to create thread attribute");
"failed to create thread attribute");
if (stackSize > 0)
{
result = pthread_attr_setstacksize (&attr, stackSize);
if (result != 0)
M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
"failed to set stack size attribute");
"failed to set stack size attribute");
}
tprintf ("initThread [%d] function = 0x%p (arg = 0x%p)\n", tid, proc,
@ -385,85 +387,132 @@ initThread (void (*proc) (void), unsigned int stackSize,
}
extern "C" int
RTco_initThread (void (*proc) (void), unsigned int stackSize,
unsigned int interrupt)
EXPORT(initThread) (void (*proc) (void), unsigned int stackSize,
unsigned int interrupt)
{
int tid;
RTco_init ();
waitSem (&lock);
EXPORT(init) ();
__gthread_mutex_lock (&lock);
tid = initThread (proc, stackSize, interrupt);
signalSem (&lock);
__gthread_mutex_unlock (&lock);
return tid;
}
/* transfer unlocks thread p2 and locks the current thread. p1 is
updated with the current thread id. */
updated with the current thread id.
The implementation of transfer uses a combined wait/signal. */
extern "C" void
RTco_transfer (int *p1, int p2)
EXPORT(transfer) (int *p1, int p2)
{
int tid = currentThread ();
if (!initialized)
M2RTS_HaltC (
__FILE__, __LINE__, __FUNCTION__,
"cannot transfer to a process before the process has been created");
if (tid == p2)
{
/* error. */
__gthread_mutex_lock (&lock);
{
if (!initialized)
M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
"attempting to transfer to ourself");
}
else
{
*p1 = tid;
tprintf ("start, context switching from: %d to %d\n", tid, p2);
RTco_signalThread (p2);
RTco_waitThread (tid);
tprintf ("end, context back to %d\n", tid);
"cannot transfer to a process before the process has been created");
if (currentThread == p2)
{
/* Error. */
M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
"attempting to transfer to ourself");
}
else
{
*p1 = currentThread;
int old = currentThread;
tprintf ("start, context switching from: %d to %d\n", currentThread, p2);
/* Perform signal (p2 sem). Without the mutex lock as we have
already obtained it above. */
if (threadArray[p2].waiting)
{
/* p2 is blocked on the condition variable, release it. */
tprintf ("p1 = %d cond_signal to p2 (%d)\n", currentThread, p2);
__gthread_cond_signal (&threadArray[p2].run_counter);
tprintf ("after p1 = %d cond_signal to p2 (%d)\n", currentThread, p2);
}
else
{
/* p2 hasn't reached the condition variable, so bump value
ready for p2 to test. */
tprintf ("no need for thread %d to cond_signal - bump %d value (pre) = %d\n",
currentThread, p2, threadArray[p2].value);
threadArray[p2].value++;
}
/* Perform wait (old sem). Again without obtaining mutex as
we've already claimed it. */
if (threadArray[old].value == 0)
{
currentThread = p2;
/* Record we are about to wait on the condition variable. */
threadArray[old].waiting = true;
__gthread_cond_wait (&threadArray[old].run_counter, &lock);
threadArray[old].waiting = false;
/* We are running again. */
currentThread = old;
}
else
{
tprintf ("(currentThread = %d) no need for thread %d to cond_wait - taking value (pre) = %d\n",
currentThread, old, threadArray[old].value);
/* No need to block as we have been told a signal has
effectively already been recorded. We remove the signal
notification without blocking. */
threadArray[old].value--;
}
tprintf ("end, context back to %d\n", currentThread);
if (currentThread != old)
M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
"wrong process id");
}
}
__gthread_mutex_unlock (&lock);
}
extern "C" int
RTco_select (int p1, fd_set *p2, fd_set *p3, fd_set *p4, const timespec *p5)
EXPORT(select) (int p1, fd_set *p2, fd_set *p3, fd_set *p4, const timespec *p5)
{
RTco_init ();
EXPORT(init) ();
tprintf ("[%x] RTco.select (...)\n", pthread_self ());
return pselect (p1, p2, p3, p4, p5, NULL);
}
extern "C" int
RTco_init (void)
EXPORT(init) (void)
{
tprintf ("checking init\n");
if (! initialized)
{
int tid;
initialized = TRUE;
tprintf ("RTco initialized\n");
initSem (&lock, 0);
__GTHREAD_MUTEX_INIT_FUNCTION (&lock);
__gthread_mutex_lock (&lock);
/* Create initial thread container. */
#if defined(POOL)
threadArray = (threadCB *)malloc (sizeof (threadCB) * THREAD_POOL);
semArray = (threadSem **)malloc (sizeof (threadSem *) * SEM_POOL);
#endif
tid = newThread (); /* For the current initial thread. */
threadArray[tid].tid = tid;
threadArray[tid].execution = initSemaphore (0);
threadArray[tid].p = pthread_self ();
threadArray[tid].interruptLevel = 0;
threadArray[tid].proc
= never; /* This shouldn't happen as we are already running. */
initialized = TRUE;
/* Create a thread control block for the main program (or process). */
currentThread = newThread (); /* For the current initial thread. */
threadArray[currentThread].p = pthread_self ();
threadArray[currentThread].tid = currentThread;
__GTHREAD_COND_INIT_FUNCTION (&threadArray[currentThread].run_counter);
threadArray[currentThread].interruptLevel = 0;
/* The line below shouldn't be necessary as we are already running. */
threadArray[currentThread].proc = never;
threadArray[currentThread].waiting = false; /* We are running. */
threadArray[currentThread].value = 0; /* No signal from anyone yet. */
tprintf ("RTco initialized completed\n");
signalSem (&lock);
__gthread_mutex_unlock (&lock);
}
return 0;
}
extern "C" void __attribute__((__constructor__))
_M2_RTco_ctor (void)
M2EXPORT(ctor) (void)
{
M2RTS_RegisterModule ("RTco", _M2_RTco_init, _M2_RTco_fini,
_M2_RTco_dep);
M2RTS_RegisterModule ("RTco",
M2EXPORT(init), M2EXPORT(fini),
M2EXPORT(dep));
}