mirror of
https://github.com/robotn/gohook.git
synced 2024-11-24 21:36:50 +08:00
1439 lines
52 KiB
C
1439 lines
52 KiB
C
// #######################################################
|
|
// ## Generated by merge_src from the following files:
|
|
// ## eb_assert.c
|
|
// ## eb_assert.h
|
|
// ## eb_atomic.h
|
|
// ## eb_chan.c
|
|
// ## eb_chan.h
|
|
// ## eb_nsec.h
|
|
// ## eb_port.c
|
|
// ## eb_port.h
|
|
// ## eb_spinlock.h
|
|
// ## eb_sys.c
|
|
// ## eb_sys.h
|
|
// ## eb_time.c
|
|
// ## eb_time.h
|
|
// #######################################################
|
|
|
|
// #######################################################
|
|
// ## eb_chan.h
|
|
// #######################################################
|
|
|
|
#ifndef EB_CHAN_H
|
|
#define EB_CHAN_H
|
|
|
|
#include <stddef.h>
|
|
#include <stdbool.h>
|
|
// #######################################################
|
|
// ## eb_nsec.h
|
|
// #######################################################
|
|
|
|
#include <stdint.h>
|
|
|
|
typedef uint64_t eb_nsec; /* Units of nanoseconds */
|
|
#define eb_nsec_zero UINT64_C(0)
|
|
#define eb_nsec_forever UINT64_MAX
|
|
#define eb_nsec_per_sec UINT64_C(1000000000)
|
|
|
|
/* ## Types */
|
|
typedef enum {
|
|
eb_chan_res_ok, /* Success */
|
|
eb_chan_res_closed, /* Failed because the channel is closed */
|
|
eb_chan_res_stalled, /* Failed because the send/recv couldn't proceed without blocking (applies to _try_send()/_try_recv()) */
|
|
} eb_chan_res;
|
|
|
|
typedef struct eb_chan *eb_chan;
|
|
typedef struct {
|
|
eb_chan chan; /* The applicable channel, where NULL channels block forever */
|
|
bool send; /* True if sending, false if receiving */
|
|
eb_chan_res res; /* _ok if the op completed due to a successful send/recv operation, _closed if the op completed because the channel is closed. */
|
|
const void *val; /* The value to be sent/the value that was received */
|
|
} eb_chan_op;
|
|
|
|
/* ## Channel creation/lifecycle */
|
|
eb_chan eb_chan_create(size_t buf_cap);
|
|
eb_chan eb_chan_retain(eb_chan c);
|
|
void eb_chan_release(eb_chan c);
|
|
|
|
/* ## Channel closing */
|
|
/* Returns _ok on success, or _closed if the channel was already closed. */
|
|
eb_chan_res eb_chan_close(eb_chan c);
|
|
|
|
/* ## Getters */
|
|
size_t eb_chan_buf_cap(eb_chan c);
|
|
size_t eb_chan_buf_len(eb_chan c);
|
|
|
|
/* ## Sending/receiving */
|
|
/* Send/receive a value on a channel (where _send()/_recv() are blocking and _try_send()/_try_recv() are non-blocking) */
|
|
eb_chan_res eb_chan_send(eb_chan c, const void *val);
|
|
eb_chan_res eb_chan_try_send(eb_chan c, const void *val);
|
|
eb_chan_res eb_chan_recv(eb_chan c, const void **val);
|
|
eb_chan_res eb_chan_try_recv(eb_chan c, const void **val);
|
|
|
|
/* ## Multiplexing */
|
|
/* _select_list() performs at most one of the operations in the supplied list, and returns the one that was performed.
|
|
It returns NULL if no operation was performed before the timeout. */
|
|
eb_chan_op *eb_chan_select_list(eb_nsec timeout, eb_chan_op *const ops[], size_t nops);
|
|
|
|
/* _select() is a convenience macro that wraps _select_list() to avoid having to manually create an array of ops on the stack.
|
|
For example:
|
|
eb_chan_op op1 = eb_chan_op_send(c1, NULL);
|
|
eb_chan_op op2 = eb_chan_op_recv(c2);
|
|
eb_chan_op *result = eb_chan_select(timeout, &op1, &op2);
|
|
...
|
|
*/
|
|
#define eb_chan_select(timeout, ...) ({ \
|
|
eb_chan_op *const eb_chan_select_ops[] = {__VA_ARGS__}; \
|
|
eb_chan_select_list(timeout, eb_chan_select_ops, (sizeof(eb_chan_select_ops) / sizeof(*eb_chan_select_ops))); \
|
|
})
|
|
|
|
/* Return initialized send/recv ops for use with _select() */
|
|
static inline eb_chan_op eb_chan_op_send(eb_chan c, const void *val) {
|
|
return (eb_chan_op){.chan = c, .send = true, .res = eb_chan_res_closed, .val = val};
|
|
}
|
|
|
|
static inline eb_chan_op eb_chan_op_recv(eb_chan c) {
|
|
return (eb_chan_op){.chan = c, .send = false, .res = eb_chan_res_closed, .val = NULL};
|
|
}
|
|
|
|
// #######################################################
|
|
// ## eb_chan.c
|
|
// #######################################################
|
|
#include <stdlib.h>
|
|
#include <assert.h>
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include <sched.h>
|
|
// #######################################################
|
|
// ## eb_assert.h
|
|
// #######################################################
|
|
|
|
#include <stdbool.h>
|
|
#include <stdint.h>
|
|
|
|
#define eb_no_op
|
|
|
|
#define eb_assert_or_recover(cond, action) ({ \
|
|
if (!(cond)) { \
|
|
eb_assert_print("Assertion failed", #cond, __FILE__, (uintmax_t)__LINE__, __PRETTY_FUNCTION__); \
|
|
action; \
|
|
} \
|
|
})
|
|
|
|
#define eb_assert_or_bail(cond, msg) ({ \
|
|
if (!(cond)) { \
|
|
eb_assert_print(msg, #cond, __FILE__, (uintmax_t)__LINE__, __PRETTY_FUNCTION__); \
|
|
abort(); \
|
|
} \
|
|
})
|
|
|
|
void eb_assert_print(const char *msg, const char *cond, const char *file, uintmax_t line, const char *func);
|
|
// #######################################################
|
|
// ## eb_assert.c
|
|
// #######################################################
|
|
|
|
#include <stdio.h>
|
|
|
|
void eb_assert_print(const char *msg, const char *cond, const char *file, uintmax_t line, const char *func) {
|
|
fprintf(stderr, "=== %s ===\n"
|
|
" Assertion: %s\n"
|
|
" File: %s:%ju\n"
|
|
" Function: %s\n", msg, cond, file, line, func);
|
|
}
|
|
// #######################################################
|
|
// ## eb_port.h
|
|
// #######################################################
|
|
|
|
#include <stddef.h>
|
|
#include <stdbool.h>
|
|
|
|
typedef struct eb_port *eb_port;
|
|
|
|
eb_port eb_port_create();
|
|
eb_port eb_port_retain(eb_port p);
|
|
void eb_port_release(eb_port p);
|
|
|
|
void eb_port_signal(eb_port p);
|
|
bool eb_port_wait(eb_port p, eb_nsec timeout);
|
|
// #######################################################
|
|
// ## eb_port.c
|
|
// #######################################################
|
|
|
|
#include <stdlib.h>
|
|
#include <assert.h>
|
|
#include <errno.h>
|
|
#include <string.h>
|
|
// #######################################################
|
|
// ## eb_sys.h
|
|
// #######################################################
|
|
|
|
#include <stddef.h>
|
|
|
|
#if __MACH__
|
|
#define EB_SYS_DARWIN 1
|
|
#elif __linux__
|
|
#define EB_SYS_LINUX 1
|
|
#else
|
|
// #error Unsupported system
|
|
#endif
|
|
|
|
/* ## Variables */
|
|
/* Returns the number of logical cores on the machine. _init must be called for this to be valid! */
|
|
size_t eb_sys_ncores;
|
|
|
|
/* ## Functions */
|
|
void eb_sys_init();
|
|
// #######################################################
|
|
// ## eb_sys.c
|
|
// #######################################################
|
|
|
|
// #######################################################
|
|
// ## eb_atomic.h
|
|
// #######################################################
|
|
|
|
|
|
#define eb_atomic_add(ptr, delta) __sync_add_and_fetch(ptr, delta) /* Returns the new value */
|
|
#define eb_atomic_compare_and_swap(ptr, old, new) __sync_bool_compare_and_swap(ptr, old, new)
|
|
#define eb_atomic_barrier() __sync_synchronize()
|
|
|
|
#if EB_SYS_DARWIN
|
|
#include <mach/mach.h>
|
|
#elif EB_SYS_LINUX
|
|
#include <unistd.h>
|
|
#endif
|
|
|
|
size_t ncores() {
|
|
#if EB_SYS_DARWIN
|
|
host_basic_info_data_t info;
|
|
mach_msg_type_number_t count = HOST_BASIC_INFO_COUNT;
|
|
kern_return_t r = host_info(mach_host_self(), HOST_BASIC_INFO, (host_info_t)&info, &count);
|
|
eb_assert_or_recover(r == KERN_SUCCESS, return 0);
|
|
eb_assert_or_recover(count == HOST_BASIC_INFO_COUNT, return 0);
|
|
eb_assert_or_recover(info.logical_cpu > 0 && info.logical_cpu <= SIZE_MAX, return 0);
|
|
|
|
return (size_t)info.logical_cpu;
|
|
#elif EB_SYS_LINUX
|
|
long ncores = sysconf(_SC_NPROCESSORS_ONLN);
|
|
eb_assert_or_recover(ncores > 0 && ncores <= SIZE_MAX, return 0);
|
|
|
|
return (size_t)ncores;
|
|
#endif
|
|
}
|
|
|
|
void eb_sys_init() {
|
|
if (!eb_sys_ncores) {
|
|
eb_atomic_compare_and_swap(&eb_sys_ncores, 0, ncores());
|
|
}
|
|
}
|
|
#if EB_SYS_DARWIN
|
|
#include <mach/mach.h>
|
|
#elif EB_SYS_LINUX
|
|
#include <time.h>
|
|
#include <semaphore.h>
|
|
#endif
|
|
// #######################################################
|
|
// ## eb_spinlock.h
|
|
// #######################################################
|
|
|
|
#include <stdbool.h>
|
|
#include <sched.h>
|
|
|
|
/* ## Types */
|
|
typedef int eb_spinlock;
|
|
#define EB_SPINLOCK_INIT 0
|
|
|
|
/* ## Functions */
|
|
#define eb_spinlock_try(l) eb_atomic_compare_and_swap(l, 0, 1)
|
|
|
|
#define eb_spinlock_lock(l) ({ \
|
|
if (eb_sys_ncores > 1) { \
|
|
while (!eb_spinlock_try(l)); \
|
|
} else { \
|
|
while (!eb_spinlock_try(l)) { \
|
|
sched_yield(); \
|
|
} \
|
|
} \
|
|
})
|
|
|
|
#define eb_spinlock_unlock(l) eb_atomic_compare_and_swap(l, 1, 0)
|
|
|
|
//#define eb_spinlock_try(l) __sync_lock_test_and_set(l, 1) == 0
|
|
//#define eb_spinlock_lock(l) while (!eb_spinlock_try(l))
|
|
//#define eb_spinlock_unlock(l) __sync_lock_release(l)
|
|
//
|
|
//typedef OSSpinLock eb_spinlock;
|
|
//#define eb_spinlock_try(l) OSSpinLockTry(l)
|
|
//#define eb_spinlock_lock(l) OSSpinLockLock(l)
|
|
//#define eb_spinlock_unlock(l) OSSpinLockUnlock(l)
|
|
// #######################################################
|
|
// ## eb_time.h
|
|
// #######################################################
|
|
|
|
|
|
/* Returns the number of nanoseconds since an arbitrary point in time (usually the machine's boot time) */
|
|
eb_nsec eb_time_now();
|
|
// #######################################################
|
|
// ## eb_time.c
|
|
// #######################################################
|
|
|
|
#include <stdint.h>
|
|
#include <stdlib.h>
|
|
#if EB_SYS_DARWIN
|
|
#include <mach/mach_time.h>
|
|
#elif EB_SYS_LINUX
|
|
#include <time.h>
|
|
#endif
|
|
|
|
eb_nsec eb_time_now() {
|
|
#if EB_SYS_DARWIN
|
|
/* Initialize k_timebase_info, thread-safely */
|
|
static mach_timebase_info_t k_timebase_info = NULL;
|
|
if (!k_timebase_info) {
|
|
mach_timebase_info_t timebase_info = malloc(sizeof(*timebase_info));
|
|
kern_return_t r = mach_timebase_info(timebase_info);
|
|
eb_assert_or_recover(r == KERN_SUCCESS, return 0);
|
|
|
|
/* Make sure the writes to 'timebase_info' are complete before we assign k_timebase_info */
|
|
eb_atomic_barrier();
|
|
|
|
if (!eb_atomic_compare_and_swap(&k_timebase_info, NULL, timebase_info)) {
|
|
free(timebase_info);
|
|
timebase_info = NULL;
|
|
}
|
|
}
|
|
|
|
return ((mach_absolute_time() * k_timebase_info->numer) / k_timebase_info->denom);
|
|
#elif EB_SYS_LINUX
|
|
struct timespec ts;
|
|
int r = clock_gettime(CLOCK_MONOTONIC, &ts);
|
|
eb_assert_or_recover(!r, return 0);
|
|
|
|
return ((uint64_t)ts.tv_sec * eb_nsec_per_sec) + ts.tv_nsec;
|
|
#endif
|
|
}
|
|
|
|
#define PORT_POOL_CAP 0x10
|
|
static eb_spinlock g_port_pool_lock = EB_SPINLOCK_INIT;
|
|
static eb_port g_port_pool[PORT_POOL_CAP];
|
|
static size_t g_port_pool_len = 0;
|
|
|
|
struct eb_port {
|
|
unsigned int retain_count;
|
|
bool sem_valid;
|
|
bool signaled;
|
|
#if EB_SYS_DARWIN
|
|
semaphore_t sem;
|
|
#elif EB_SYS_LINUX
|
|
sem_t sem;
|
|
#endif
|
|
};
|
|
|
|
static void eb_port_free(eb_port p) {
|
|
/* Allowing p==NULL so that this function can be called unconditionally on failure from eb_port_create() */
|
|
if (!p) {
|
|
return;
|
|
}
|
|
|
|
bool added_to_pool = false;
|
|
if (p->sem_valid) {
|
|
/* Determine whether we should clear the reset the port because we're going to try adding the port to our pool. */
|
|
bool reset = false;
|
|
eb_spinlock_lock(&g_port_pool_lock);
|
|
reset = (g_port_pool_len < PORT_POOL_CAP);
|
|
eb_spinlock_unlock(&g_port_pool_lock);
|
|
|
|
if (reset) {
|
|
eb_port_wait(p, eb_nsec_zero);
|
|
}
|
|
|
|
/* Now that the port's reset, add it to the pool as long as it'll still fit. */
|
|
eb_spinlock_lock(&g_port_pool_lock);
|
|
if (g_port_pool_len < PORT_POOL_CAP) {
|
|
g_port_pool[g_port_pool_len] = p;
|
|
g_port_pool_len++;
|
|
added_to_pool = true;
|
|
}
|
|
eb_spinlock_unlock(&g_port_pool_lock);
|
|
|
|
/* If we couldn't add the port to the pool, destroy the underlying semaphore. */
|
|
if (!added_to_pool) {
|
|
#if EB_SYS_DARWIN
|
|
kern_return_t r = semaphore_destroy(mach_task_self(), p->sem);
|
|
eb_assert_or_recover(r == KERN_SUCCESS, eb_no_op);
|
|
#elif EB_SYS_LINUX
|
|
int r = sem_destroy(&p->sem);
|
|
eb_assert_or_recover(!r, eb_no_op);
|
|
#endif
|
|
|
|
p->sem_valid = false;
|
|
}
|
|
}
|
|
|
|
if (!added_to_pool) {
|
|
free(p);
|
|
p = NULL;
|
|
}
|
|
}
|
|
|
|
eb_port eb_port_create() {
|
|
eb_port p = NULL;
|
|
|
|
/* First try to pop a port out of the pool */
|
|
eb_spinlock_lock(&g_port_pool_lock);
|
|
if (g_port_pool_len) {
|
|
g_port_pool_len--;
|
|
p = g_port_pool[g_port_pool_len];
|
|
}
|
|
eb_spinlock_unlock(&g_port_pool_lock);
|
|
|
|
if (p) {
|
|
/* We successfully popped a port out of the pool */
|
|
eb_assert_or_bail(!p->retain_count, "Sanity-check failed");
|
|
} else {
|
|
/* We couldn't get a port out of the pool */
|
|
/* Using calloc so that bytes are zeroed */
|
|
p = calloc(1, sizeof(*p));
|
|
eb_assert_or_recover(p, goto failed);
|
|
|
|
/* Create the semaphore */
|
|
#if EB_SYS_DARWIN
|
|
kern_return_t r = semaphore_create(mach_task_self(), &p->sem, SYNC_POLICY_FIFO, 0);
|
|
eb_assert_or_recover(r == KERN_SUCCESS, goto failed);
|
|
#elif EB_SYS_LINUX
|
|
int r = sem_init(&p->sem, 0, 0);
|
|
eb_assert_or_recover(!r, goto failed);
|
|
#endif
|
|
}
|
|
|
|
p->sem_valid = true;
|
|
p->retain_count = 1;
|
|
return p;
|
|
failed: {
|
|
eb_port_free(p);
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
eb_port eb_port_retain(eb_port p) {
|
|
assert(p);
|
|
eb_atomic_add(&p->retain_count, 1);
|
|
return p;
|
|
}
|
|
|
|
void eb_port_release(eb_port p) {
|
|
assert(p);
|
|
if (eb_atomic_add(&p->retain_count, -1) == 0) {
|
|
eb_port_free(p);
|
|
}
|
|
}
|
|
|
|
void eb_port_signal(eb_port p) {
|
|
assert(p);
|
|
|
|
if (eb_atomic_compare_and_swap(&p->signaled, false, true)) {
|
|
#if EB_SYS_DARWIN
|
|
kern_return_t r = semaphore_signal(p->sem);
|
|
eb_assert_or_recover(r == KERN_SUCCESS, eb_no_op);
|
|
#elif EB_SYS_LINUX
|
|
int r = sem_post(&p->sem);
|
|
eb_assert_or_recover(!r, eb_no_op);
|
|
#endif
|
|
}
|
|
}
|
|
|
|
bool eb_port_wait(eb_port p, eb_nsec timeout) {
|
|
assert(p);
|
|
|
|
bool result = false;
|
|
if (timeout == eb_nsec_zero) {
|
|
/* ## Non-blocking */
|
|
#if EB_SYS_DARWIN
|
|
kern_return_t r = semaphore_timedwait(p->sem, (mach_timespec_t){0, 0});
|
|
eb_assert_or_recover(r == KERN_SUCCESS || r == KERN_OPERATION_TIMED_OUT, eb_no_op);
|
|
|
|
result = (r == KERN_SUCCESS);
|
|
#elif EB_SYS_LINUX
|
|
int r = 0;
|
|
while ((r = sem_trywait(&p->sem)) == -1 && errno == EINTR);
|
|
eb_assert_or_recover(!r || (r == -1 && errno == EAGAIN), eb_no_op);
|
|
|
|
result = !r;
|
|
#endif
|
|
} else if (timeout == eb_nsec_forever) {
|
|
/* ## Blocking */
|
|
#if EB_SYS_DARWIN
|
|
kern_return_t r;
|
|
while ((r = semaphore_wait(p->sem)) == KERN_ABORTED);
|
|
eb_assert_or_recover(r == KERN_SUCCESS, eb_no_op);
|
|
|
|
result = (r == KERN_SUCCESS);
|
|
#elif EB_SYS_LINUX
|
|
int r;
|
|
while ((r = sem_wait(&p->sem)) == -1 && errno == EINTR);
|
|
eb_assert_or_recover(!r, eb_no_op);
|
|
|
|
result = !r;
|
|
#endif
|
|
} else {
|
|
/* ## Actual timeout */
|
|
eb_nsec start_time = eb_time_now();
|
|
eb_nsec remaining_timeout = timeout;
|
|
for (;;) {
|
|
#if EB_SYS_DARWIN
|
|
/* This needs to be in a loop because semaphore_timedwait() can return KERN_ABORTED, e.g. if the process receives a signal. */
|
|
mach_timespec_t ts = {.tv_sec = (unsigned int)(remaining_timeout / eb_nsec_per_sec), .tv_nsec = (clock_res_t)(remaining_timeout % eb_nsec_per_sec)};
|
|
kern_return_t r = semaphore_timedwait(p->sem, ts);
|
|
eb_assert_or_recover(r == KERN_SUCCESS || r == KERN_OPERATION_TIMED_OUT || r == KERN_ABORTED, eb_no_op);
|
|
|
|
if (r == KERN_SUCCESS) {
|
|
result = true;
|
|
break;
|
|
}
|
|
#elif EB_SYS_LINUX
|
|
/* Because sem_timedwait() uses the system's _REALTIME clock instead of the _MONOTONIC clock, we'll time out when
|
|
the system's time changes. For that reason, we check for the timeout case ourself (instead of relying on errno
|
|
after calling sem_timedwait()) condition ourself, using our own monotonic clock APIs (eb_time_now()), and
|
|
restart sem_timedwait() if we determine independently that we haven't timed-out. */
|
|
struct timespec ts;
|
|
int r = clock_gettime(CLOCK_REALTIME, &ts);
|
|
eb_assert_or_recover(!r, break);
|
|
|
|
ts.tv_sec += (remaining_timeout / eb_nsec_per_sec);
|
|
ts.tv_nsec += (remaining_timeout % eb_nsec_per_sec);
|
|
r = sem_timedwait(&p->sem, &ts);
|
|
/* The allowed return cases are: success (r==0), timed-out (r==-1, errno==ETIMEDOUT), (r==-1, errno==EINTR) */
|
|
eb_assert_or_recover(!r || (r == -1 && (errno == ETIMEDOUT || errno == EINTR)), break);
|
|
|
|
/* If we acquired the semaphore, set our flag and break! */
|
|
if (!r) {
|
|
result = true;
|
|
break;
|
|
}
|
|
#endif
|
|
|
|
/* Determine whether we timed-out, and if not, update 'remaining_timeout' with the amount of time to go. */
|
|
eb_nsec elapsed = eb_time_now() - start_time;
|
|
if (elapsed < timeout) {
|
|
remaining_timeout = timeout - elapsed;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (result) {
|
|
assert(eb_atomic_compare_and_swap(&p->signaled, true, false));
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
#pragma mark - Types -
|
|
typedef struct {
|
|
eb_spinlock lock;
|
|
size_t cap;
|
|
size_t len;
|
|
eb_port *ports;
|
|
} *port_list;
|
|
|
|
static inline void port_list_free(port_list l);
|
|
|
|
/* Creates a new empty list */
|
|
static inline port_list port_list_alloc(size_t cap) {
|
|
assert(cap > 0);
|
|
|
|
port_list result = malloc(sizeof(*result));
|
|
eb_assert_or_recover(result, goto failed);
|
|
|
|
result->lock = EB_SPINLOCK_INIT;
|
|
result->cap = cap;
|
|
result->len = 0;
|
|
result->ports = malloc(cap * sizeof(*(result->ports)));
|
|
eb_assert_or_recover(result->ports, goto failed);
|
|
|
|
return result;
|
|
failed: {
|
|
port_list_free(result);
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
/* Releases every port in the list, and frees the list itself */
|
|
static inline void port_list_free(port_list l) {
|
|
/* Intentionally allowing l==NULL */
|
|
if (!l) {
|
|
return;
|
|
}
|
|
|
|
/* Release each port in our list */
|
|
size_t i;
|
|
for (i = 0; i < l->len; i++) {
|
|
eb_port_release(l->ports[i]);
|
|
}
|
|
|
|
free(l->ports);
|
|
l->ports = NULL;
|
|
|
|
free(l);
|
|
l = NULL;
|
|
}
|
|
|
|
/* Add a port to the end of the list, expanding the buffer as necessary */
|
|
static inline void port_list_add(port_list l, eb_port p) {
|
|
assert(l);
|
|
assert(p);
|
|
|
|
/* First retain the port! */
|
|
eb_port_retain(p);
|
|
|
|
eb_spinlock_lock(&l->lock);
|
|
/* Sanity-check that the list's length is less than its capacity */
|
|
eb_assert_or_bail(l->len <= l->cap, "Sanity check failed");
|
|
|
|
/* Expand the list's buffer if it's full */
|
|
if (l->len == l->cap) {
|
|
l->cap *= 2;
|
|
// TODO: reimplement as a linked list, where the port nodes are just on the stacks of the _select_list() calls. that way the number of ports is unbounded, and we don't have to allocate anything on the heap!
|
|
l->ports = realloc(l->ports, l->cap * sizeof(*(l->ports)));
|
|
eb_assert_or_bail(l->ports, "Allocation failed");
|
|
}
|
|
|
|
l->ports[l->len] = p;
|
|
l->len++;
|
|
eb_spinlock_unlock(&l->lock);
|
|
}
|
|
|
|
/* Remove the first occurence of 'p' in the list. Returns whether a port was actually removed. */
|
|
static inline bool port_list_rm(port_list l, eb_port p) {
|
|
assert(l);
|
|
assert(p);
|
|
|
|
bool result = false;
|
|
eb_spinlock_lock(&l->lock);
|
|
/* Sanity-check that the list's length is less than its capacity */
|
|
eb_assert_or_bail(l->len <= l->cap, "Sanity-check failed");
|
|
|
|
/* Search for first occurence of the given port. If we find it, release it and move the last port in the list into the hole. */
|
|
size_t i;
|
|
for (i = 0; i < l->len; i++) {
|
|
if (l->ports[i] == p) {
|
|
/* Move the last element in the port list into the now-vacant spot */
|
|
l->ports[i] = l->ports[l->len-1];
|
|
/* Decrement the buffer length */
|
|
l->len--;
|
|
result = true;
|
|
break;
|
|
}
|
|
}
|
|
eb_spinlock_unlock(&l->lock);
|
|
|
|
if (result) {
|
|
/* Release the port, but do so outside of the spinlock because releasing does some stuff that might not be quick. */
|
|
eb_port_release(p);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/* Signal the first port in the list that isn't 'ignore' */
|
|
static inline void port_list_signal_first(const port_list l, eb_port ignore) {
|
|
assert(l);
|
|
|
|
eb_port p = NULL;
|
|
eb_spinlock_lock(&l->lock);
|
|
size_t i;
|
|
for (i = 0; i < l->len; i++) {
|
|
if (l->ports[i] != ignore) {
|
|
p = eb_port_retain(l->ports[i]);
|
|
break;
|
|
}
|
|
}
|
|
eb_spinlock_unlock(&l->lock);
|
|
|
|
if (p) {
|
|
eb_port_signal(p);
|
|
eb_port_release(p);
|
|
p = NULL;
|
|
}
|
|
}
|
|
|
|
enum {
|
|
/* Buffered/unbuffered channel states */
|
|
chanstate_open,
|
|
chanstate_closed,
|
|
/* Unbuffered channel states */
|
|
chanstate_send,
|
|
chanstate_recv,
|
|
chanstate_ack,
|
|
chanstate_done,
|
|
chanstate_cancelled
|
|
}; typedef int32_t chanstate;
|
|
|
|
typedef struct {
|
|
eb_chan_op *const *ops;
|
|
size_t nops;
|
|
bool *cleanup_ops;
|
|
|
|
eb_nsec timeout;
|
|
eb_port port;
|
|
} do_state;
|
|
|
|
struct eb_chan {
|
|
unsigned int retain_count;
|
|
eb_spinlock lock;
|
|
chanstate state;
|
|
|
|
port_list sends;
|
|
port_list recvs;
|
|
|
|
/* Buffered ivars */
|
|
size_t buf_cap;
|
|
size_t buf_len;
|
|
size_t buf_idx;
|
|
const void **buf;
|
|
|
|
/* Unbuffered ivars */
|
|
const do_state *unbuf_state;
|
|
eb_chan_op *unbuf_op;
|
|
eb_port unbuf_port;
|
|
};
|
|
|
|
#pragma mark - Channel creation/lifecycle -
|
|
static inline void eb_chan_free(eb_chan c) {
|
|
/* Intentionally allowing c==NULL so that this function can be called from eb_chan_create() */
|
|
if (!c) {
|
|
return;
|
|
}
|
|
|
|
if (c->buf_cap) {
|
|
/* ## Buffered */
|
|
free(c->buf);
|
|
c->buf = NULL;
|
|
}
|
|
|
|
port_list_free(c->recvs);
|
|
c->recvs = NULL;
|
|
|
|
port_list_free(c->sends);
|
|
c->sends = NULL;
|
|
|
|
free(c);
|
|
c = NULL;
|
|
}
|
|
|
|
eb_chan eb_chan_create(size_t buf_cap) {
|
|
static const size_t k_init_buf_cap = 16;
|
|
|
|
/* Initialize eb_sys so that eb_sys_ncores is valid. */
|
|
eb_sys_init();
|
|
|
|
/* Using calloc so that the bytes are zeroed. */
|
|
eb_chan c = calloc(1, sizeof(*c));
|
|
eb_assert_or_recover(c, goto failed);
|
|
|
|
c->retain_count = 1;
|
|
c->lock = EB_SPINLOCK_INIT;
|
|
c->state = chanstate_open;
|
|
|
|
c->sends = port_list_alloc(k_init_buf_cap);
|
|
eb_assert_or_recover(c->sends, goto failed);
|
|
c->recvs = port_list_alloc(k_init_buf_cap);
|
|
eb_assert_or_recover(c->recvs, goto failed);
|
|
|
|
if (buf_cap) {
|
|
/* ## Buffered */
|
|
c->buf_cap = buf_cap;
|
|
c->buf_len = 0;
|
|
c->buf_idx = 0;
|
|
c->buf = malloc(c->buf_cap * sizeof(*(c->buf)));
|
|
eb_assert_or_recover(c->buf, goto failed);
|
|
} else {
|
|
/* ## Unbuffered */
|
|
c->unbuf_state = NULL;
|
|
c->unbuf_op = NULL;
|
|
c->unbuf_port = NULL;
|
|
}
|
|
|
|
/* Issue a memory barrier since we didn't have the lock acquired for our set up (and this channel could theoretically
|
|
be passed to another thread without a barrier, and that'd be bad news...) */
|
|
eb_atomic_barrier();
|
|
|
|
return c;
|
|
failed: {
|
|
eb_chan_free(c);
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
eb_chan eb_chan_retain(eb_chan c) {
|
|
assert(c);
|
|
eb_atomic_add(&c->retain_count, 1);
|
|
return c;
|
|
}
|
|
|
|
void eb_chan_release(eb_chan c) {
|
|
assert(c);
|
|
if (eb_atomic_add(&c->retain_count, -1) == 0) {
|
|
eb_chan_free(c);
|
|
}
|
|
}
|
|
|
|
#pragma mark - Channel closing -
|
|
eb_chan_res eb_chan_close(eb_chan c) {
|
|
assert(c);
|
|
|
|
eb_chan_res result = eb_chan_res_stalled;
|
|
while (result == eb_chan_res_stalled) {
|
|
eb_port signal_port = NULL;
|
|
eb_spinlock_lock(&c->lock);
|
|
if (c->state == chanstate_open) {
|
|
c->state = chanstate_closed;
|
|
result = eb_chan_res_ok;
|
|
} else if (c->state == chanstate_closed) {
|
|
result = eb_chan_res_closed;
|
|
} else if (c->state == chanstate_send || c->state == chanstate_recv) {
|
|
if (c->unbuf_port) {
|
|
signal_port = eb_port_retain(c->unbuf_port);
|
|
}
|
|
c->state = chanstate_closed;
|
|
result = eb_chan_res_ok;
|
|
}
|
|
eb_spinlock_unlock(&c->lock);
|
|
|
|
/* Wake up the send/recv */
|
|
if (signal_port) {
|
|
eb_port_signal(signal_port);
|
|
eb_port_release(signal_port);
|
|
signal_port = NULL;
|
|
}
|
|
}
|
|
|
|
if (result == eb_chan_res_ok) {
|
|
/* Wake up the sends/recvs so that they see the channel's now closed */
|
|
port_list_signal_first(c->sends, NULL);
|
|
port_list_signal_first(c->recvs, NULL);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
#pragma mark - Getters -
|
|
size_t eb_chan_buf_cap(eb_chan c) {
|
|
assert(c);
|
|
return c->buf_cap;
|
|
}
|
|
|
|
size_t eb_chan_buf_len(eb_chan c) {
|
|
assert(c);
|
|
|
|
/* buf_len is only valid if the channel's buffered */
|
|
if (!c->buf_cap) {
|
|
return 0;
|
|
}
|
|
|
|
size_t r = 0;
|
|
eb_spinlock_lock(&c->lock);
|
|
r = c->buf_len;
|
|
eb_spinlock_unlock(&c->lock);
|
|
return r;
|
|
}
|
|
|
|
#pragma mark - Performing operations -
|
|
enum {
|
|
op_result_complete, /* The op completed and the caller should return */
|
|
op_result_next, /* The op couldn't make any progress and the caller should move on to the next op */
|
|
op_result_retry, /* The channel's busy and we should try the op again */
|
|
}; typedef unsigned int op_result;
|
|
|
|
static inline void cleanup_ops(const do_state *state) {
|
|
assert(state);
|
|
|
|
size_t i;
|
|
for (i = 0; i < state->nops; i++) {
|
|
if (state->cleanup_ops[i]) {
|
|
eb_chan_op *op = state->ops[i];
|
|
eb_chan c = op->chan;
|
|
bool signal_send = false;
|
|
bool signal_recv = false;
|
|
eb_spinlock_lock(&c->lock);
|
|
if (c->state == chanstate_send && c->unbuf_op == op) {
|
|
/* 'op' was in the process of an unbuffered send on the channel, but no recv had arrived
|
|
yet, so reset state to _open. */
|
|
c->state = chanstate_open;
|
|
signal_send = true;
|
|
} else if (c->state == chanstate_recv && c->unbuf_op == op) {
|
|
/* 'op' was in the process of an unbuffered recv on the channel, but no send had arrived
|
|
yet, so reset state to _open. */
|
|
c->state = chanstate_open;
|
|
signal_recv = true;
|
|
} else if (c->state == chanstate_ack && c->unbuf_op == op) {
|
|
/* A counterpart acknowledged 'op' but, but 'op' isn't the one that completed in our select() call, so we're cancelling. */
|
|
c->state = chanstate_cancelled;
|
|
}
|
|
eb_spinlock_unlock(&c->lock);
|
|
|
|
if (signal_send) {
|
|
port_list_signal_first(c->sends, state->port);
|
|
}
|
|
|
|
if (signal_recv) {
|
|
port_list_signal_first(c->recvs, state->port);
|
|
}
|
|
|
|
state->cleanup_ops[i] = false;
|
|
}
|
|
}
|
|
}
|
|
|
|
static inline op_result send_buf(const do_state *state, eb_chan_op *op, size_t op_idx) {
|
|
assert(state);
|
|
assert(op);
|
|
assert(op->chan);
|
|
|
|
eb_chan c = op->chan;
|
|
op_result result = op_result_next;
|
|
|
|
if (c->buf_len < c->buf_cap || c->state == chanstate_closed) {
|
|
/* It looks like our channel's in an acceptable state, so try to acquire the lock */
|
|
if (eb_spinlock_try(&c->lock)) {
|
|
/* Sanity-check the channel's state */
|
|
eb_assert_or_bail(c->state == chanstate_open || c->state == chanstate_closed, "Invalid channel state");
|
|
|
|
bool signal_recv = false;
|
|
if (c->state == chanstate_closed) {
|
|
/* ## Sending, buffered, channel closed */
|
|
/* Set our op's state and our return value */
|
|
op->res = eb_chan_res_closed;
|
|
result = op_result_complete;
|
|
} else if (c->buf_len < c->buf_cap) {
|
|
/* ## Sending, buffered, channel open, buffer has space */
|
|
/* Notify the channel's recvs if our buffer is going from empty to non-empty */
|
|
signal_recv = (!c->buf_len);
|
|
/* Add the value to the buffer */
|
|
size_t idx = (c->buf_idx + c->buf_len) % c->buf_cap;
|
|
c->buf[idx] = op->val;
|
|
c->buf_len++;
|
|
/* Set our op's state and our return value */
|
|
op->res = eb_chan_res_ok;
|
|
result = op_result_complete;
|
|
}
|
|
|
|
eb_spinlock_unlock(&c->lock);
|
|
|
|
if (signal_recv) {
|
|
port_list_signal_first(c->recvs, state->port);
|
|
}
|
|
} else {
|
|
result = op_result_retry;
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
static inline op_result recv_buf(const do_state *state, eb_chan_op *op, size_t op_idx) {
|
|
assert(state);
|
|
assert(op);
|
|
assert(op->chan);
|
|
|
|
eb_chan c = op->chan;
|
|
op_result result = op_result_next;
|
|
|
|
if (c->buf_len || c->state == chanstate_closed) {
|
|
if (eb_spinlock_try(&c->lock)) {
|
|
/* Sanity-check the channel's state */
|
|
eb_assert_or_bail(c->state == chanstate_open || c->state == chanstate_closed, "Invalid channel state");
|
|
|
|
bool signal_send = false;
|
|
if (c->buf_len) {
|
|
/* ## Receiving, buffered, buffer non-empty */
|
|
/* Notify the channel's sends if our buffer is going from full to not-full */
|
|
signal_send = (c->buf_len == c->buf_cap);
|
|
/* Set our op's state and our return value */
|
|
op->res = eb_chan_res_ok;
|
|
op->val = c->buf[c->buf_idx];
|
|
result = op_result_complete;
|
|
/* Update chan's buffer. (Updating buf_idx needs to come after we use it!) */
|
|
c->buf_len--;
|
|
c->buf_idx = (c->buf_idx + 1) % c->buf_cap;
|
|
} else if (c->state == chanstate_closed) {
|
|
/* ## Receiving, buffered, buffer empty, channel closed */
|
|
/* Set our op's state and our return value */
|
|
op->res = eb_chan_res_closed;
|
|
op->val = NULL;
|
|
result = op_result_complete;
|
|
}
|
|
|
|
eb_spinlock_unlock(&c->lock);
|
|
|
|
if (signal_send) {
|
|
port_list_signal_first(c->sends, state->port);
|
|
}
|
|
} else {
|
|
result = op_result_retry;
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
static inline op_result send_unbuf(const do_state *state, eb_chan_op *op, size_t op_idx) {
|
|
assert(state);
|
|
assert(op);
|
|
assert(op->chan);
|
|
|
|
eb_chan c = op->chan;
|
|
op_result result = op_result_next;
|
|
|
|
if ((c->state == chanstate_open && state->timeout != eb_nsec_zero) ||
|
|
c->state == chanstate_closed ||
|
|
(c->state == chanstate_send && c->unbuf_op == op) ||
|
|
(c->state == chanstate_recv && c->unbuf_state != state) ||
|
|
(c->state == chanstate_ack && c->unbuf_op == op)) {
|
|
|
|
/* It looks like our channel's in an acceptable state, so try to acquire the lock */
|
|
if (eb_spinlock_try(&c->lock)) {
|
|
/* Reset the cleanup state since we acquired the lock and are actually getting a look at the channel's state */
|
|
state->cleanup_ops[op_idx] = false;
|
|
|
|
bool signal_recv = false;
|
|
if (c->state == chanstate_open && state->timeout != eb_nsec_zero) {
|
|
c->state = chanstate_send;
|
|
c->unbuf_state = state;
|
|
c->unbuf_op = op;
|
|
c->unbuf_port = state->port;
|
|
/* We need to cleanup after this since we put it in the _send state! */
|
|
state->cleanup_ops[op_idx] = true;
|
|
/* Signal a recv since one of them can continue now */
|
|
signal_recv = true;
|
|
} else if (c->state == chanstate_closed) {
|
|
/* Set our op's state and our return value */
|
|
op->res = eb_chan_res_closed;
|
|
result = op_result_complete;
|
|
} else if (c->state == chanstate_send && c->unbuf_op == op) {
|
|
/* We own the send op that's in progress, so assign chan's unbuf_port */
|
|
/* Verify that the unbuf_state matches our 'id' parameter. If this assertion fails, it means there's likely
|
|
one eb_chan_op being shared by multiple threads, which isn't allowed. */
|
|
eb_assert_or_bail(c->unbuf_state == state, "unbuf_state invalid");
|
|
/* Assign the port */
|
|
c->unbuf_port = state->port;
|
|
/* We need to cleanup after this since we put it in the _send state! */
|
|
state->cleanup_ops[op_idx] = true;
|
|
} else if (c->state == chanstate_recv && c->unbuf_state != state) {
|
|
/* We verified (immediately above) that the recv isn't part of the same op pool (we can't do unbuffered
|
|
sends/recvs from the same _do() call) */
|
|
|
|
/* Sanity check -- make sure the op is a recv */
|
|
eb_assert_or_bail(!c->unbuf_op->send, "Op isn't a recv as expected");
|
|
|
|
/* Set the recv op's value. This needs to happen before we transition out of the _recv state, otherwise the unbuf_op may no longer be valid! */
|
|
c->unbuf_op->val = op->val;
|
|
/* Acknowledge the receive */
|
|
c->state = chanstate_ack;
|
|
/* Get a reference to the unbuf_port that needs to be signaled */
|
|
eb_port signal_port = (c->unbuf_port ? eb_port_retain(c->unbuf_port) : NULL);
|
|
eb_spinlock_unlock(&c->lock);
|
|
|
|
/* Wake up the recv */
|
|
if (signal_port) {
|
|
eb_port_signal(signal_port);
|
|
eb_port_release(signal_port);
|
|
signal_port = NULL;
|
|
}
|
|
|
|
/* We have to cleanup all our ops here to cancel any outstanding unbuffered send/recvs, to avoid a deadlock
|
|
situation that arises when another _do() is waiting on our _do() to complete, but it never does because
|
|
we're about to wait for the other _do() to complete. */
|
|
cleanup_ops(state);
|
|
|
|
for (;;) {
|
|
if (*((volatile chanstate *)&c->state) != chanstate_ack) {
|
|
eb_spinlock_lock(&c->lock);
|
|
if (c->state == chanstate_done) {
|
|
/* Reset the channel state back to _open */
|
|
c->state = chanstate_open;
|
|
/* We reset our state to _open, so signal a send since it can proceed now. */
|
|
signal_recv = true;
|
|
/* Set our op's state and our return value */
|
|
op->res = eb_chan_res_ok;
|
|
result = op_result_complete;
|
|
/* Breaking here so that we skip the _unlock() call, because we unlock the spinlock outside
|
|
of our large if-statement. */
|
|
break;
|
|
} else if (c->state == chanstate_cancelled) {
|
|
/* Reset the channel state back to _open */
|
|
c->state = chanstate_open;
|
|
/* As long as we're not polling, we should try the op again */
|
|
if (state->timeout != eb_nsec_zero) {
|
|
result = op_result_retry;
|
|
} else {
|
|
/* We're not telling the caller to retry, so signal a send since it can proceed now. */
|
|
signal_recv = true;
|
|
}
|
|
/* Breaking here so that we skip the _unlock() call, because we unlock the spinlock outside
|
|
of our large if-statement. */
|
|
break;
|
|
}
|
|
eb_spinlock_unlock(&c->lock);
|
|
} else if (eb_sys_ncores == 1) {
|
|
/* On uniprocessor machines, yield to the scheduler because we can't continue until another
|
|
thread updates the channel's state. */
|
|
sched_yield();
|
|
}
|
|
}
|
|
} else if (c->state == chanstate_ack && c->unbuf_op == op) {
|
|
/* A recv acknowledged our send! */
|
|
/* Verify that the unbuf_state matches our 'id' parameter. If this assertion fails, it means there's likely
|
|
one eb_chan_op being shared by multiple threads, which isn't allowed. */
|
|
eb_assert_or_bail(c->unbuf_state == state, "unbuf_state invalid");
|
|
/* A recv is polling for chan's state to change, so update it to signal that we're done sending! */
|
|
c->state = chanstate_done;
|
|
/* Set our op's state and our return value */
|
|
op->res = eb_chan_res_ok;
|
|
result = op_result_complete;
|
|
}
|
|
|
|
eb_spinlock_unlock(&c->lock);
|
|
|
|
if (signal_recv) {
|
|
port_list_signal_first(c->recvs, state->port);
|
|
}
|
|
} else {
|
|
result = op_result_retry;
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
static inline op_result recv_unbuf(const do_state *state, eb_chan_op *op, size_t op_idx) {
|
|
assert(state);
|
|
assert(op);
|
|
assert(op->chan);
|
|
|
|
eb_chan c = op->chan;
|
|
op_result result = op_result_next;
|
|
|
|
if ((c->state == chanstate_open && state->timeout != eb_nsec_zero) ||
|
|
c->state == chanstate_closed ||
|
|
(c->state == chanstate_send && c->unbuf_state != state) ||
|
|
(c->state == chanstate_recv && c->unbuf_op == op) ||
|
|
(c->state == chanstate_ack && c->unbuf_op == op)) {
|
|
|
|
/* It looks like our channel's in an acceptable state, so try to acquire the lock */
|
|
if (eb_spinlock_try(&c->lock)) {
|
|
/* Reset the cleanup state since we acquired the lock and are actually getting a look at the channel's state */
|
|
state->cleanup_ops[op_idx] = false;
|
|
|
|
bool signal_send = false;
|
|
if (c->state == chanstate_open && state->timeout != eb_nsec_zero) {
|
|
c->state = chanstate_recv;
|
|
c->unbuf_state = state;
|
|
c->unbuf_op = op;
|
|
c->unbuf_port = state->port;
|
|
/* We need to cleanup after this since we put it in the _send state! */
|
|
state->cleanup_ops[op_idx] = true;
|
|
/* Signal a send since one of them can continue now */
|
|
signal_send = true;
|
|
} else if (c->state == chanstate_closed) {
|
|
/* Set our op's state and our return value */
|
|
op->res = eb_chan_res_closed;
|
|
op->val = NULL;
|
|
result = op_result_complete;
|
|
} else if (c->state == chanstate_send && c->unbuf_state != state) {
|
|
/* We verified (immediately above) that the send isn't part of the same op pool (we can't do unbuffered
|
|
sends/recvs from the same _do() call) */
|
|
|
|
/* Sanity check -- make sure the op is a send */
|
|
eb_assert_or_bail(c->unbuf_op->send, "Op isn't a send as expected");
|
|
|
|
/* Get the op's value. This needs to happen before we transition out of the _send state, otherwise the unbuf_op may no longer be valid! */
|
|
op->val = c->unbuf_op->val;
|
|
/* Acknowledge the send */
|
|
c->state = chanstate_ack;
|
|
/* Get a reference to the unbuf_port that needs to be signaled */
|
|
eb_port signal_port = (c->unbuf_port ? eb_port_retain(c->unbuf_port) : NULL);
|
|
eb_spinlock_unlock(&c->lock);
|
|
|
|
/* Wake up the send */
|
|
if (signal_port) {
|
|
eb_port_signal(signal_port);
|
|
eb_port_release(signal_port);
|
|
signal_port = NULL;
|
|
}
|
|
|
|
/* We have to cleanup all our ops here to cancel any outstanding unbuffered send/recvs, to avoid a deadlock
|
|
situation that arises when another _do() is waiting on our _do() to complete, but it never does because
|
|
we're about to wait for the other _do() to complete. */
|
|
cleanup_ops(state);
|
|
|
|
for (;;) {
|
|
if (*((volatile chanstate *)&c->state) != chanstate_ack) {
|
|
eb_spinlock_lock(&c->lock);
|
|
if (c->state == chanstate_done) {
|
|
/* Reset the channel state back to _open */
|
|
c->state = chanstate_open;
|
|
/* We reset our state to _open, so signal a recv since it can proceed now. */
|
|
signal_send = true;
|
|
/* Set our op's state and our return value */
|
|
op->res = eb_chan_res_ok;
|
|
result = op_result_complete;
|
|
/* Breaking here so that we skip the _unlock() call, because we unlock the spinlock outside
|
|
of our large if-statement. */
|
|
break;
|
|
} else if (c->state == chanstate_cancelled) {
|
|
/* Reset the channel state back to _open */
|
|
c->state = chanstate_open;
|
|
/* As long as we're not polling, we should try the op again */
|
|
if (state->timeout != eb_nsec_zero) {
|
|
result = op_result_retry;
|
|
} else {
|
|
/* We're not telling the caller to retry, so signal a recv since it can proceed now. */
|
|
signal_send = true;
|
|
}
|
|
/* Breaking here so that we skip the _unlock() call, because we unlock the spinlock outside
|
|
of our large if-statement. */
|
|
break;
|
|
}
|
|
eb_spinlock_unlock(&c->lock);
|
|
} else if (eb_sys_ncores == 1) {
|
|
/* On uniprocessor machines, yield to the scheduler because we can't continue until another
|
|
thread updates the channel's state. */
|
|
sched_yield();
|
|
}
|
|
}
|
|
} else if (c->state == chanstate_recv && c->unbuf_op == op) {
|
|
/* We own the recv op that's in progress, so assign chan's unbuf_port */
|
|
/* Verify that the _recv_id matches our 'id' parameter. If this assertion fails, it means there's likely
|
|
one eb_chan_op being shared by multiple threads, which isn't allowed. */
|
|
eb_assert_or_bail(c->unbuf_state == state, "unbuf_state invalid");
|
|
/* Assign the port */
|
|
c->unbuf_port = state->port;
|
|
/* We need to cleanup after this since we put it in the _send state! */
|
|
state->cleanup_ops[op_idx] = true;
|
|
} else if (c->state == chanstate_ack && c->unbuf_op == op) {
|
|
/* A send acknowledged our recv! */
|
|
/* Verify that the unbuf_state matches our 'id' parameter. If this assertion fails, it means there's likely
|
|
one eb_chan_op being shared by multiple threads, which isn't allowed. */
|
|
eb_assert_or_bail(c->unbuf_state == state, "unbuf_state invalid");
|
|
/* A send is polling for chan's state to change, so update it to signal that we're done sending! */
|
|
c->state = chanstate_done;
|
|
/* Set our op's state and our return value */
|
|
op->res = eb_chan_res_ok;
|
|
result = op_result_complete;
|
|
}
|
|
|
|
eb_spinlock_unlock(&c->lock);
|
|
|
|
if (signal_send) {
|
|
port_list_signal_first(c->sends, state->port);
|
|
}
|
|
} else {
|
|
result = op_result_retry;
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
static inline op_result try_op(const do_state *state, eb_chan_op *op, size_t op_idx) {
|
|
assert(state);
|
|
assert(op);
|
|
|
|
eb_chan c = op->chan;
|
|
if (c) {
|
|
if (op->send) {
|
|
/* ## Send */
|
|
return (c->buf_cap ? send_buf(state, op, op_idx) : send_unbuf(state, op, op_idx));
|
|
} else {
|
|
/* ## Receive */
|
|
return (c->buf_cap ? recv_buf(state, op, op_idx) : recv_unbuf(state, op, op_idx));
|
|
}
|
|
}
|
|
return op_result_next;
|
|
}
|
|
|
|
eb_chan_res eb_chan_send(eb_chan c, const void *val) {
|
|
eb_chan_op op = eb_chan_op_send(c, val);
|
|
eb_assert_or_bail(eb_chan_select(eb_nsec_forever, &op) == &op, "Invalid select() return value");
|
|
return op.res;
|
|
}
|
|
|
|
eb_chan_res eb_chan_try_send(eb_chan c, const void *val) {
|
|
eb_chan_op op = eb_chan_op_send(c, val);
|
|
eb_chan_op *r = eb_chan_select(eb_nsec_zero, &op);
|
|
eb_assert_or_bail(r == NULL || r == &op, "Invalid select() return value");
|
|
return (r ? op.res : eb_chan_res_stalled);
|
|
}
|
|
|
|
eb_chan_res eb_chan_recv(eb_chan c, const void **val) {
|
|
eb_chan_op op = eb_chan_op_recv(c);
|
|
eb_assert_or_bail(eb_chan_select(eb_nsec_forever, &op) == &op, "Invalid select() return value");
|
|
if (op.res == eb_chan_res_ok && val) {
|
|
*val = op.val;
|
|
}
|
|
return op.res;
|
|
}
|
|
|
|
eb_chan_res eb_chan_try_recv(eb_chan c, const void **val) {
|
|
eb_chan_op op = eb_chan_op_recv(c);
|
|
eb_chan_op *r = eb_chan_select(eb_nsec_zero, &op);
|
|
eb_assert_or_bail(r == NULL || r == &op, "Invalid select() return value");
|
|
if (r && op.res == eb_chan_res_ok && val) {
|
|
*val = op.val;
|
|
}
|
|
return (r ? op.res : eb_chan_res_stalled);
|
|
}
|
|
|
|
#pragma mark - Multiplexing -
|
|
#define next_idx(nops, delta, idx) (delta == 1 && idx == nops-1 ? 0 : ((delta == -1 && idx == 0) ? nops-1 : idx+delta))
|
|
eb_chan_op *eb_chan_select_list(eb_nsec timeout, eb_chan_op *const ops[], size_t nops) {
|
|
assert(!nops || ops);
|
|
|
|
const size_t k_attempt_multiplier = (eb_sys_ncores == 1 ? 1 : 500);
|
|
eb_nsec start_time = 0;
|
|
size_t idx_start = 0;
|
|
int8_t idx_delta = 0;
|
|
if (nops > 1) {
|
|
/* Assign idx_start/idx_delta, which control the op pseudo-randomization */
|
|
start_time = eb_time_now();
|
|
idx_start = (start_time/1000)%nops;
|
|
idx_delta = (!((start_time/10000)%2) ? 1 : -1);
|
|
}
|
|
|
|
bool co[nops];
|
|
memset(co, 0, sizeof(co));
|
|
|
|
eb_chan_op *result = NULL;
|
|
do_state state = {
|
|
.ops = ops,
|
|
.nops = nops,
|
|
.cleanup_ops = co,
|
|
.timeout = timeout,
|
|
.port = NULL};
|
|
|
|
if (timeout == eb_nsec_zero) {
|
|
/* ## timeout == 0: try every op exactly once; if none of them can proceed, return NULL. */
|
|
size_t i, idx;
|
|
for (i = 0, idx = idx_start; i < nops; i++, idx = next_idx(nops, idx_delta, idx)) {
|
|
eb_chan_op *op = ops[idx];
|
|
op_result r;
|
|
while ((r = try_op(&state, op, idx)) == op_result_retry) {
|
|
if (eb_sys_ncores == 1) {
|
|
/* On uniprocessor machines, yield to the scheduler because we can't continue until another
|
|
thread updates the channel's state. */
|
|
sched_yield();
|
|
}
|
|
}
|
|
|
|
/* If the op completed, we need to exit! */
|
|
if (r == op_result_complete) {
|
|
result = op;
|
|
goto cleanup;
|
|
}
|
|
}
|
|
} else {
|
|
/* ## timeout != 0 */
|
|
if (timeout != eb_nsec_forever && !start_time) {
|
|
start_time = eb_time_now();
|
|
}
|
|
|
|
for (;;) {
|
|
/* ## Fast path: loop over our operations to see if one of them was able to send/receive. (If not,
|
|
we'll enter the slow path where we put our thread to sleep until we're signaled.) */
|
|
size_t i, idx;
|
|
for (i = 0, idx = idx_start; i < k_attempt_multiplier*nops; i++, idx = next_idx(nops, idx_delta, idx)) {
|
|
eb_chan_op *op = ops[idx];
|
|
op_result r = try_op(&state, op, idx);
|
|
/* If the op completed, we need to exit! */
|
|
if (r == op_result_complete) {
|
|
result = op;
|
|
goto cleanup;
|
|
}
|
|
}
|
|
|
|
/* ## Slow path: we weren't able to find an operation that could send/receive, so we'll create a
|
|
port to receive notifications on and put this thread to sleep until someone wakes us up. */
|
|
if (!state.port) {
|
|
/* Create our port that we'll attach to channels so that we can be notified when events occur. */
|
|
state.port = eb_port_create();
|
|
eb_assert_or_recover(state.port, goto cleanup);
|
|
|
|
/* Register our port for the appropriate notifications on every channel. */
|
|
/* This adds 'port' to the channel's sends/recvs (depending on the op), which we clean up at the
|
|
end of this function. */
|
|
size_t i;
|
|
for (i = 0; i < nops; i++) {
|
|
eb_chan_op *op = ops[i];
|
|
eb_chan c = op->chan;
|
|
if (c) {
|
|
port_list_add((op->send ? c->sends : c->recvs), state.port);
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Before we go to sleep, call try_op() for every op until we get a non-busy return value. This way we'll ensure
|
|
that no op is actually able to be performed, and we'll also ensure that 'port' is registered as the 'unbuf_port'
|
|
for the necessary channels. */
|
|
// size_t i, idx;
|
|
for (i = 0, idx = idx_start; i < nops; i++, idx = next_idx(nops, idx_delta, idx)) {
|
|
eb_chan_op *op = ops[idx];
|
|
op_result r;
|
|
while ((r = try_op(&state, op, idx)) == op_result_retry) {
|
|
if (eb_sys_ncores == 1) {
|
|
/* On uniprocessor machines, yield to the scheduler because we can't continue until another
|
|
thread updates the channel's state. */
|
|
sched_yield();
|
|
}
|
|
}
|
|
|
|
/* If the op completed, we need to exit! */
|
|
if (r == op_result_complete) {
|
|
result = op;
|
|
goto cleanup;
|
|
}
|
|
}
|
|
|
|
eb_nsec wait_timeout = eb_nsec_forever;
|
|
if (timeout != eb_nsec_forever) {
|
|
/* If we have a timeout, determine how much time has elapsed, because we may have timed-out. */
|
|
eb_nsec elapsed = eb_time_now() - start_time;
|
|
/* Check if we timed-out */
|
|
if (elapsed < timeout) {
|
|
wait_timeout = timeout - elapsed;
|
|
} else {
|
|
goto cleanup;
|
|
}
|
|
}
|
|
|
|
/* Put our thread to sleep until someone alerts us of an event */
|
|
eb_port_wait(state.port, wait_timeout);
|
|
}
|
|
}
|
|
|
|
/* Cleanup! */
|
|
cleanup: {
|
|
if (state.port) {
|
|
size_t i;
|
|
for (i = 0; i < nops; i++) {
|
|
eb_chan_op *op = ops[i];
|
|
eb_chan c = op->chan;
|
|
if (c) {
|
|
port_list ports = (op->send ? c->sends : c->recvs);
|
|
port_list_rm(ports, state.port);
|
|
port_list_signal_first(ports, state.port);
|
|
}
|
|
}
|
|
}
|
|
|
|
cleanup_ops(&state);
|
|
|
|
if (state.port) {
|
|
eb_port_release(state.port);
|
|
state.port = NULL;
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
#endif /* EB_CHAN_H */ |