From 11eee3201fbae78c3b9ad6ada24f9d8c23ea9f2b Mon Sep 17 00:00:00 2001 From: htmk Date: Fri, 8 Feb 2019 20:26:02 -0200 Subject: [PATCH] Part 2 of making the lib async - New event struct mimiking C structs - changed c channel library to eb_chan - changed API to something more palatable --- chan/eb_chan.h | 1430 +++++++++++++++++++++++++++++++++++++++++++++++ event/goEvent.h | 90 ++- extern.go | 32 +- hook.go | 101 ++-- test/main.go | 30 +- 5 files changed, 1602 insertions(+), 81 deletions(-) create mode 100644 chan/eb_chan.h diff --git a/chan/eb_chan.h b/chan/eb_chan.h new file mode 100644 index 0000000..be02996 --- /dev/null +++ b/chan/eb_chan.h @@ -0,0 +1,1430 @@ +// ####################################################### +// ## Generated by merge_src from the following files: +// ## eb_assert.c +// ## eb_assert.h +// ## eb_atomic.h +// ## eb_chan.c +// ## eb_chan.h +// ## eb_nsec.h +// ## eb_port.c +// ## eb_port.h +// ## eb_spinlock.h +// ## eb_sys.c +// ## eb_sys.h +// ## eb_time.c +// ## eb_time.h +// ####################################################### + +// ####################################################### +// ## eb_chan.h +// ####################################################### + +#ifndef EB_CHAN_H +#define EB_CHAN_H + +#include +#include +// ####################################################### +// ## eb_nsec.h +// ####################################################### + +#include + +typedef uint64_t eb_nsec; /* Units of nanoseconds */ +#define eb_nsec_zero UINT64_C(0) +#define eb_nsec_forever UINT64_MAX +#define eb_nsec_per_sec UINT64_C(1000000000) + +/* ## Types */ +typedef enum { + eb_chan_res_ok, /* Success */ + eb_chan_res_closed, /* Failed because the channel is closed */ + eb_chan_res_stalled, /* Failed because the send/recv couldn't proceed without blocking (applies to _try_send()/_try_recv()) */ +} eb_chan_res; + +typedef struct eb_chan *eb_chan; +typedef struct { + eb_chan chan; /* The applicable channel, where NULL channels block forever */ + bool send; /* True if sending, false if receiving */ + eb_chan_res res; /* _ok if the op completed due to a successful send/recv operation, _closed if the op completed because the channel is closed. */ + const void *val; /* The value to be sent/the value that was received */ +} eb_chan_op; + +/* ## Channel creation/lifecycle */ +eb_chan eb_chan_create(size_t buf_cap); +eb_chan eb_chan_retain(eb_chan c); +void eb_chan_release(eb_chan c); + +/* ## Channel closing */ +/* Returns _ok on success, or _closed if the channel was already closed. */ +eb_chan_res eb_chan_close(eb_chan c); + +/* ## Getters */ +size_t eb_chan_buf_cap(eb_chan c); +size_t eb_chan_buf_len(eb_chan c); + +/* ## Sending/receiving */ +/* Send/receive a value on a channel (where _send()/_recv() are blocking and _try_send()/_try_recv() are non-blocking) */ +eb_chan_res eb_chan_send(eb_chan c, const void *val); +eb_chan_res eb_chan_try_send(eb_chan c, const void *val); +eb_chan_res eb_chan_recv(eb_chan c, const void **val); +eb_chan_res eb_chan_try_recv(eb_chan c, const void **val); + +/* ## Multiplexing */ +/* _select_list() performs at most one of the operations in the supplied list, and returns the one that was performed. + It returns NULL if no operation was performed before the timeout. */ +eb_chan_op *eb_chan_select_list(eb_nsec timeout, eb_chan_op *const ops[], size_t nops); + +/* _select() is a convenience macro that wraps _select_list() to avoid having to manually create an array of ops on the stack. + For example: + eb_chan_op op1 = eb_chan_op_send(c1, NULL); + eb_chan_op op2 = eb_chan_op_recv(c2); + eb_chan_op *result = eb_chan_select(timeout, &op1, &op2); + ... +*/ +#define eb_chan_select(timeout, ...) ({ \ + eb_chan_op *const eb_chan_select_ops[] = {__VA_ARGS__}; \ + eb_chan_select_list(timeout, eb_chan_select_ops, (sizeof(eb_chan_select_ops) / sizeof(*eb_chan_select_ops))); \ +}) + +/* Return initialized send/recv ops for use with _select() */ +static inline eb_chan_op eb_chan_op_send(eb_chan c, const void *val) { + return (eb_chan_op){.chan = c, .send = true, .res = eb_chan_res_closed, .val = val}; +} + +static inline eb_chan_op eb_chan_op_recv(eb_chan c) { + return (eb_chan_op){.chan = c, .send = false, .res = eb_chan_res_closed, .val = NULL}; +} + +// ####################################################### +// ## eb_chan.c +// ####################################################### +#include +#include +#include +#include +#include +// ####################################################### +// ## eb_assert.h +// ####################################################### + +#include +#include + +#define eb_no_op + +#define eb_assert_or_recover(cond, action) ({ \ + if (!(cond)) { \ + eb_assert_print("Assertion failed", #cond, __FILE__, (uintmax_t)__LINE__, __PRETTY_FUNCTION__); \ + action; \ + } \ +}) + +#define eb_assert_or_bail(cond, msg) ({ \ + if (!(cond)) { \ + eb_assert_print(msg, #cond, __FILE__, (uintmax_t)__LINE__, __PRETTY_FUNCTION__); \ + abort(); \ + } \ +}) + +void eb_assert_print(const char *msg, const char *cond, const char *file, uintmax_t line, const char *func); +// ####################################################### +// ## eb_assert.c +// ####################################################### + +#include + +void eb_assert_print(const char *msg, const char *cond, const char *file, uintmax_t line, const char *func) { + fprintf(stderr, "=== %s ===\n" + " Assertion: %s\n" + " File: %s:%ju\n" + " Function: %s\n", msg, cond, file, line, func); +} +// ####################################################### +// ## eb_port.h +// ####################################################### + +#include +#include + +typedef struct eb_port *eb_port; + +eb_port eb_port_create(); +eb_port eb_port_retain(eb_port p); +void eb_port_release(eb_port p); + +void eb_port_signal(eb_port p); +bool eb_port_wait(eb_port p, eb_nsec timeout); +// ####################################################### +// ## eb_port.c +// ####################################################### + +#include +#include +#include +#include +// ####################################################### +// ## eb_sys.h +// ####################################################### + +#include + +#if __MACH__ + #define EB_SYS_DARWIN 1 +#elif __linux__ + #define EB_SYS_LINUX 1 +#else +// #error Unsupported system +#endif + +/* ## Variables */ +/* Returns the number of logical cores on the machine. _init must be called for this to be valid! */ +size_t eb_sys_ncores; + +/* ## Functions */ +void eb_sys_init(); +// ####################################################### +// ## eb_sys.c +// ####################################################### + +// ####################################################### +// ## eb_atomic.h +// ####################################################### + + +#define eb_atomic_add(ptr, delta) __sync_add_and_fetch(ptr, delta) /* Returns the new value */ +#define eb_atomic_compare_and_swap(ptr, old, new) __sync_bool_compare_and_swap(ptr, old, new) +#define eb_atomic_barrier() __sync_synchronize() + +#if EB_SYS_DARWIN + #include +#elif EB_SYS_LINUX + #include +#endif + +size_t ncores() { + #if EB_SYS_DARWIN + host_basic_info_data_t info; + mach_msg_type_number_t count = HOST_BASIC_INFO_COUNT; + kern_return_t r = host_info(mach_host_self(), HOST_BASIC_INFO, (host_info_t)&info, &count); + eb_assert_or_recover(r == KERN_SUCCESS, return 0); + eb_assert_or_recover(count == HOST_BASIC_INFO_COUNT, return 0); + eb_assert_or_recover(info.logical_cpu > 0 && info.logical_cpu <= SIZE_MAX, return 0); + + return (size_t)info.logical_cpu; + #elif EB_SYS_LINUX + long ncores = sysconf(_SC_NPROCESSORS_ONLN); + eb_assert_or_recover(ncores > 0 && ncores <= SIZE_MAX, return 0); + + return (size_t)ncores; + #endif +} + +void eb_sys_init() { + if (!eb_sys_ncores) { + eb_atomic_compare_and_swap(&eb_sys_ncores, 0, ncores()); + } +} +#if EB_SYS_DARWIN + #include +#elif EB_SYS_LINUX + #include + #include +#endif +// ####################################################### +// ## eb_spinlock.h +// ####################################################### + +#include +#include + +/* ## Types */ +typedef int eb_spinlock; +#define EB_SPINLOCK_INIT 0 + +/* ## Functions */ +#define eb_spinlock_try(l) eb_atomic_compare_and_swap(l, 0, 1) + +#define eb_spinlock_lock(l) ({ \ + if (eb_sys_ncores > 1) { \ + while (!eb_spinlock_try(l)); \ + } else { \ + while (!eb_spinlock_try(l)) { \ + sched_yield(); \ + } \ + } \ +}) + +#define eb_spinlock_unlock(l) eb_atomic_compare_and_swap(l, 1, 0) + +//#define eb_spinlock_try(l) __sync_lock_test_and_set(l, 1) == 0 +//#define eb_spinlock_lock(l) while (!eb_spinlock_try(l)) +//#define eb_spinlock_unlock(l) __sync_lock_release(l) +// +//typedef OSSpinLock eb_spinlock; +//#define eb_spinlock_try(l) OSSpinLockTry(l) +//#define eb_spinlock_lock(l) OSSpinLockLock(l) +//#define eb_spinlock_unlock(l) OSSpinLockUnlock(l) +// ####################################################### +// ## eb_time.h +// ####################################################### + + +/* Returns the number of nanoseconds since an arbitrary point in time (usually the machine's boot time) */ +eb_nsec eb_time_now(); +// ####################################################### +// ## eb_time.c +// ####################################################### + +#include +#include +#if EB_SYS_DARWIN + #include +#elif EB_SYS_LINUX + #include +#endif + +eb_nsec eb_time_now() { +#if EB_SYS_DARWIN + /* Initialize k_timebase_info, thread-safely */ + static mach_timebase_info_t k_timebase_info = NULL; + if (!k_timebase_info) { + mach_timebase_info_t timebase_info = malloc(sizeof(*timebase_info)); + kern_return_t r = mach_timebase_info(timebase_info); + eb_assert_or_recover(r == KERN_SUCCESS, return 0); + + /* Make sure the writes to 'timebase_info' are complete before we assign k_timebase_info */ + eb_atomic_barrier(); + + if (!eb_atomic_compare_and_swap(&k_timebase_info, NULL, timebase_info)) { + free(timebase_info); + timebase_info = NULL; + } + } + + return ((mach_absolute_time() * k_timebase_info->numer) / k_timebase_info->denom); +#elif EB_SYS_LINUX + struct timespec ts; + int r = clock_gettime(CLOCK_MONOTONIC, &ts); + eb_assert_or_recover(!r, return 0); + + return ((uint64_t)ts.tv_sec * eb_nsec_per_sec) + ts.tv_nsec; +#endif +} + +#define PORT_POOL_CAP 0x10 +static eb_spinlock g_port_pool_lock = EB_SPINLOCK_INIT; +static eb_port g_port_pool[PORT_POOL_CAP]; +static size_t g_port_pool_len = 0; + +struct eb_port { + unsigned int retain_count; + bool sem_valid; + bool signaled; + #if EB_SYS_DARWIN + semaphore_t sem; + #elif EB_SYS_LINUX + sem_t sem; + #endif +}; + +static void eb_port_free(eb_port p) { + /* Allowing p==NULL so that this function can be called unconditionally on failure from eb_port_create() */ + if (!p) { + return; + } + + bool added_to_pool = false; + if (p->sem_valid) { + /* Determine whether we should clear the reset the port because we're going to try adding the port to our pool. */ + bool reset = false; + eb_spinlock_lock(&g_port_pool_lock); + reset = (g_port_pool_len < PORT_POOL_CAP); + eb_spinlock_unlock(&g_port_pool_lock); + + if (reset) { + eb_port_wait(p, eb_nsec_zero); + } + + /* Now that the port's reset, add it to the pool as long as it'll still fit. */ + eb_spinlock_lock(&g_port_pool_lock); + if (g_port_pool_len < PORT_POOL_CAP) { + g_port_pool[g_port_pool_len] = p; + g_port_pool_len++; + added_to_pool = true; + } + eb_spinlock_unlock(&g_port_pool_lock); + + /* If we couldn't add the port to the pool, destroy the underlying semaphore. */ + if (!added_to_pool) { + #if EB_SYS_DARWIN + kern_return_t r = semaphore_destroy(mach_task_self(), p->sem); + eb_assert_or_recover(r == KERN_SUCCESS, eb_no_op); + #elif EB_SYS_LINUX + int r = sem_destroy(&p->sem); + eb_assert_or_recover(!r, eb_no_op); + #endif + + p->sem_valid = false; + } + } + + if (!added_to_pool) { + free(p); + p = NULL; + } +} + +eb_port eb_port_create() { + eb_port p = NULL; + + /* First try to pop a port out of the pool */ + eb_spinlock_lock(&g_port_pool_lock); + if (g_port_pool_len) { + g_port_pool_len--; + p = g_port_pool[g_port_pool_len]; + } + eb_spinlock_unlock(&g_port_pool_lock); + + if (p) { + /* We successfully popped a port out of the pool */ + eb_assert_or_bail(!p->retain_count, "Sanity-check failed"); + } else { + /* We couldn't get a port out of the pool */ + /* Using calloc so that bytes are zeroed */ + p = calloc(1, sizeof(*p)); + eb_assert_or_recover(p, goto failed); + + /* Create the semaphore */ + #if EB_SYS_DARWIN + kern_return_t r = semaphore_create(mach_task_self(), &p->sem, SYNC_POLICY_FIFO, 0); + eb_assert_or_recover(r == KERN_SUCCESS, goto failed); + #elif EB_SYS_LINUX + int r = sem_init(&p->sem, 0, 0); + eb_assert_or_recover(!r, goto failed); + #endif + } + + p->sem_valid = true; + p->retain_count = 1; + return p; + failed: { + eb_port_free(p); + return NULL; + } +} + +eb_port eb_port_retain(eb_port p) { + assert(p); + eb_atomic_add(&p->retain_count, 1); + return p; +} + +void eb_port_release(eb_port p) { + assert(p); + if (eb_atomic_add(&p->retain_count, -1) == 0) { + eb_port_free(p); + } +} + +void eb_port_signal(eb_port p) { + assert(p); + + if (eb_atomic_compare_and_swap(&p->signaled, false, true)) { + #if EB_SYS_DARWIN + kern_return_t r = semaphore_signal(p->sem); + eb_assert_or_recover(r == KERN_SUCCESS, eb_no_op); + #elif EB_SYS_LINUX + int r = sem_post(&p->sem); + eb_assert_or_recover(!r, eb_no_op); + #endif + } +} + +bool eb_port_wait(eb_port p, eb_nsec timeout) { + assert(p); + + bool result = false; + if (timeout == eb_nsec_zero) { + /* ## Non-blocking */ + #if EB_SYS_DARWIN + kern_return_t r = semaphore_timedwait(p->sem, (mach_timespec_t){0, 0}); + eb_assert_or_recover(r == KERN_SUCCESS || r == KERN_OPERATION_TIMED_OUT, eb_no_op); + + result = (r == KERN_SUCCESS); + #elif EB_SYS_LINUX + int r = 0; + while ((r = sem_trywait(&p->sem)) == -1 && errno == EINTR); + eb_assert_or_recover(!r || (r == -1 && errno == EAGAIN), eb_no_op); + + result = !r; + #endif + } else if (timeout == eb_nsec_forever) { + /* ## Blocking */ + #if EB_SYS_DARWIN + kern_return_t r; + while ((r = semaphore_wait(p->sem)) == KERN_ABORTED); + eb_assert_or_recover(r == KERN_SUCCESS, eb_no_op); + + result = (r == KERN_SUCCESS); + #elif EB_SYS_LINUX + int r; + while ((r = sem_wait(&p->sem)) == -1 && errno == EINTR); + eb_assert_or_recover(!r, eb_no_op); + + result = !r; + #endif + } else { + /* ## Actual timeout */ + eb_nsec start_time = eb_time_now(); + eb_nsec remaining_timeout = timeout; + for (;;) { + #if EB_SYS_DARWIN + /* This needs to be in a loop because semaphore_timedwait() can return KERN_ABORTED, e.g. if the process receives a signal. */ + mach_timespec_t ts = {.tv_sec = (unsigned int)(remaining_timeout / eb_nsec_per_sec), .tv_nsec = (clock_res_t)(remaining_timeout % eb_nsec_per_sec)}; + kern_return_t r = semaphore_timedwait(p->sem, ts); + eb_assert_or_recover(r == KERN_SUCCESS || r == KERN_OPERATION_TIMED_OUT || r == KERN_ABORTED, eb_no_op); + + if (r == KERN_SUCCESS) { + result = true; + break; + } + #elif EB_SYS_LINUX + /* Because sem_timedwait() uses the system's _REALTIME clock instead of the _MONOTONIC clock, we'll time out when + the system's time changes. For that reason, we check for the timeout case ourself (instead of relying on errno + after calling sem_timedwait()) condition ourself, using our own monotonic clock APIs (eb_time_now()), and + restart sem_timedwait() if we determine independently that we haven't timed-out. */ + struct timespec ts; + int r = clock_gettime(CLOCK_REALTIME, &ts); + eb_assert_or_recover(!r, break); + + ts.tv_sec += (remaining_timeout / eb_nsec_per_sec); + ts.tv_nsec += (remaining_timeout % eb_nsec_per_sec); + r = sem_timedwait(&p->sem, &ts); + /* The allowed return cases are: success (r==0), timed-out (r==-1, errno==ETIMEDOUT), (r==-1, errno==EINTR) */ + eb_assert_or_recover(!r || (r == -1 && (errno == ETIMEDOUT || errno == EINTR)), break); + + /* If we acquired the semaphore, set our flag and break! */ + if (!r) { + result = true; + break; + } + #endif + + /* Determine whether we timed-out, and if not, update 'remaining_timeout' with the amount of time to go. */ + eb_nsec elapsed = eb_time_now() - start_time; + if (elapsed < timeout) { + remaining_timeout = timeout - elapsed; + } else { + break; + } + } + } + + if (result) { + assert(eb_atomic_compare_and_swap(&p->signaled, true, false)); + } + + return result; +} + +#pragma mark - Types - +typedef struct { + eb_spinlock lock; + size_t cap; + size_t len; + eb_port *ports; +} *port_list; + +static inline void port_list_free(port_list l); + +/* Creates a new empty list */ +static inline port_list port_list_alloc(size_t cap) { + assert(cap > 0); + + port_list result = malloc(sizeof(*result)); + eb_assert_or_recover(result, goto failed); + + result->lock = EB_SPINLOCK_INIT; + result->cap = cap; + result->len = 0; + result->ports = malloc(cap * sizeof(*(result->ports))); + eb_assert_or_recover(result->ports, goto failed); + + return result; + failed: { + port_list_free(result); + return NULL; + } +} + +/* Releases every port in the list, and frees the list itself */ +static inline void port_list_free(port_list l) { + /* Intentionally allowing l==NULL */ + if (!l) { + return; + } + + /* Release each port in our list */ + for (size_t i = 0; i < l->len; i++) { + eb_port_release(l->ports[i]); + } + + free(l->ports); + l->ports = NULL; + + free(l); + l = NULL; +} + +/* Add a port to the end of the list, expanding the buffer as necessary */ +static inline void port_list_add(port_list l, eb_port p) { + assert(l); + assert(p); + + /* First retain the port! */ + eb_port_retain(p); + + eb_spinlock_lock(&l->lock); + /* Sanity-check that the list's length is less than its capacity */ + eb_assert_or_bail(l->len <= l->cap, "Sanity check failed"); + + /* Expand the list's buffer if it's full */ + if (l->len == l->cap) { + l->cap *= 2; + // TODO: reimplement as a linked list, where the port nodes are just on the stacks of the _select_list() calls. that way the number of ports is unbounded, and we don't have to allocate anything on the heap! + l->ports = realloc(l->ports, l->cap * sizeof(*(l->ports))); + eb_assert_or_bail(l->ports, "Allocation failed"); + } + + l->ports[l->len] = p; + l->len++; + eb_spinlock_unlock(&l->lock); +} + +/* Remove the first occurence of 'p' in the list. Returns whether a port was actually removed. */ +static inline bool port_list_rm(port_list l, eb_port p) { + assert(l); + assert(p); + + bool result = false; + eb_spinlock_lock(&l->lock); + /* Sanity-check that the list's length is less than its capacity */ + eb_assert_or_bail(l->len <= l->cap, "Sanity-check failed"); + + /* Search for first occurence of the given port. If we find it, release it and move the last port in the list into the hole. */ + for (size_t i = 0; i < l->len; i++) { + if (l->ports[i] == p) { + /* Move the last element in the port list into the now-vacant spot */ + l->ports[i] = l->ports[l->len-1]; + /* Decrement the buffer length */ + l->len--; + result = true; + break; + } + } + eb_spinlock_unlock(&l->lock); + + if (result) { + /* Release the port, but do so outside of the spinlock because releasing does some stuff that might not be quick. */ + eb_port_release(p); + } + + return result; +} + +/* Signal the first port in the list that isn't 'ignore' */ +static inline void port_list_signal_first(const port_list l, eb_port ignore) { + assert(l); + + eb_port p = NULL; + eb_spinlock_lock(&l->lock); + for (size_t i = 0; i < l->len; i++) { + if (l->ports[i] != ignore) { + p = eb_port_retain(l->ports[i]); + break; + } + } + eb_spinlock_unlock(&l->lock); + + if (p) { + eb_port_signal(p); + eb_port_release(p); + p = NULL; + } +} + +enum { + /* Buffered/unbuffered channel states */ + chanstate_open, + chanstate_closed, + /* Unbuffered channel states */ + chanstate_send, + chanstate_recv, + chanstate_ack, + chanstate_done, + chanstate_cancelled +}; typedef int32_t chanstate; + +typedef struct { + eb_chan_op *const *ops; + size_t nops; + bool *cleanup_ops; + + eb_nsec timeout; + eb_port port; +} do_state; + +struct eb_chan { + unsigned int retain_count; + eb_spinlock lock; + chanstate state; + + port_list sends; + port_list recvs; + + /* Buffered ivars */ + size_t buf_cap; + size_t buf_len; + size_t buf_idx; + const void **buf; + + /* Unbuffered ivars */ + const do_state *unbuf_state; + eb_chan_op *unbuf_op; + eb_port unbuf_port; +}; + +#pragma mark - Channel creation/lifecycle - +static inline void eb_chan_free(eb_chan c) { + /* Intentionally allowing c==NULL so that this function can be called from eb_chan_create() */ + if (!c) { + return; + } + + if (c->buf_cap) { + /* ## Buffered */ + free(c->buf); + c->buf = NULL; + } + + port_list_free(c->recvs); + c->recvs = NULL; + + port_list_free(c->sends); + c->sends = NULL; + + free(c); + c = NULL; +} + +eb_chan eb_chan_create(size_t buf_cap) { + static const size_t k_init_buf_cap = 16; + + /* Initialize eb_sys so that eb_sys_ncores is valid. */ + eb_sys_init(); + + /* Using calloc so that the bytes are zeroed. */ + eb_chan c = calloc(1, sizeof(*c)); + eb_assert_or_recover(c, goto failed); + + c->retain_count = 1; + c->lock = EB_SPINLOCK_INIT; + c->state = chanstate_open; + + c->sends = port_list_alloc(k_init_buf_cap); + eb_assert_or_recover(c->sends, goto failed); + c->recvs = port_list_alloc(k_init_buf_cap); + eb_assert_or_recover(c->recvs, goto failed); + + if (buf_cap) { + /* ## Buffered */ + c->buf_cap = buf_cap; + c->buf_len = 0; + c->buf_idx = 0; + c->buf = malloc(c->buf_cap * sizeof(*(c->buf))); + eb_assert_or_recover(c->buf, goto failed); + } else { + /* ## Unbuffered */ + c->unbuf_state = NULL; + c->unbuf_op = NULL; + c->unbuf_port = NULL; + } + + /* Issue a memory barrier since we didn't have the lock acquired for our set up (and this channel could theoretically + be passed to another thread without a barrier, and that'd be bad news...) */ + eb_atomic_barrier(); + + return c; + failed: { + eb_chan_free(c); + return NULL; + } +} + +eb_chan eb_chan_retain(eb_chan c) { + assert(c); + eb_atomic_add(&c->retain_count, 1); + return c; +} + +void eb_chan_release(eb_chan c) { + assert(c); + if (eb_atomic_add(&c->retain_count, -1) == 0) { + eb_chan_free(c); + } +} + +#pragma mark - Channel closing - +eb_chan_res eb_chan_close(eb_chan c) { + assert(c); + + eb_chan_res result = eb_chan_res_stalled; + while (result == eb_chan_res_stalled) { + eb_port signal_port = NULL; + eb_spinlock_lock(&c->lock); + if (c->state == chanstate_open) { + c->state = chanstate_closed; + result = eb_chan_res_ok; + } else if (c->state == chanstate_closed) { + result = eb_chan_res_closed; + } else if (c->state == chanstate_send || c->state == chanstate_recv) { + if (c->unbuf_port) { + signal_port = eb_port_retain(c->unbuf_port); + } + c->state = chanstate_closed; + result = eb_chan_res_ok; + } + eb_spinlock_unlock(&c->lock); + + /* Wake up the send/recv */ + if (signal_port) { + eb_port_signal(signal_port); + eb_port_release(signal_port); + signal_port = NULL; + } + } + + if (result == eb_chan_res_ok) { + /* Wake up the sends/recvs so that they see the channel's now closed */ + port_list_signal_first(c->sends, NULL); + port_list_signal_first(c->recvs, NULL); + } + + return result; +} + +#pragma mark - Getters - +size_t eb_chan_buf_cap(eb_chan c) { + assert(c); + return c->buf_cap; +} + +size_t eb_chan_buf_len(eb_chan c) { + assert(c); + + /* buf_len is only valid if the channel's buffered */ + if (!c->buf_cap) { + return 0; + } + + size_t r = 0; + eb_spinlock_lock(&c->lock); + r = c->buf_len; + eb_spinlock_unlock(&c->lock); + return r; +} + +#pragma mark - Performing operations - +enum { + op_result_complete, /* The op completed and the caller should return */ + op_result_next, /* The op couldn't make any progress and the caller should move on to the next op */ + op_result_retry, /* The channel's busy and we should try the op again */ +}; typedef unsigned int op_result; + +static inline void cleanup_ops(const do_state *state) { + assert(state); + + for (size_t i = 0; i < state->nops; i++) { + if (state->cleanup_ops[i]) { + eb_chan_op *op = state->ops[i]; + eb_chan c = op->chan; + bool signal_send = false; + bool signal_recv = false; + eb_spinlock_lock(&c->lock); + if (c->state == chanstate_send && c->unbuf_op == op) { + /* 'op' was in the process of an unbuffered send on the channel, but no recv had arrived + yet, so reset state to _open. */ + c->state = chanstate_open; + signal_send = true; + } else if (c->state == chanstate_recv && c->unbuf_op == op) { + /* 'op' was in the process of an unbuffered recv on the channel, but no send had arrived + yet, so reset state to _open. */ + c->state = chanstate_open; + signal_recv = true; + } else if (c->state == chanstate_ack && c->unbuf_op == op) { + /* A counterpart acknowledged 'op' but, but 'op' isn't the one that completed in our select() call, so we're cancelling. */ + c->state = chanstate_cancelled; + } + eb_spinlock_unlock(&c->lock); + + if (signal_send) { + port_list_signal_first(c->sends, state->port); + } + + if (signal_recv) { + port_list_signal_first(c->recvs, state->port); + } + + state->cleanup_ops[i] = false; + } + } +} + +static inline op_result send_buf(const do_state *state, eb_chan_op *op, size_t op_idx) { + assert(state); + assert(op); + assert(op->chan); + + eb_chan c = op->chan; + op_result result = op_result_next; + + if (c->buf_len < c->buf_cap || c->state == chanstate_closed) { + /* It looks like our channel's in an acceptable state, so try to acquire the lock */ + if (eb_spinlock_try(&c->lock)) { + /* Sanity-check the channel's state */ + eb_assert_or_bail(c->state == chanstate_open || c->state == chanstate_closed, "Invalid channel state"); + + bool signal_recv = false; + if (c->state == chanstate_closed) { + /* ## Sending, buffered, channel closed */ + /* Set our op's state and our return value */ + op->res = eb_chan_res_closed; + result = op_result_complete; + } else if (c->buf_len < c->buf_cap) { + /* ## Sending, buffered, channel open, buffer has space */ + /* Notify the channel's recvs if our buffer is going from empty to non-empty */ + signal_recv = (!c->buf_len); + /* Add the value to the buffer */ + size_t idx = (c->buf_idx + c->buf_len) % c->buf_cap; + c->buf[idx] = op->val; + c->buf_len++; + /* Set our op's state and our return value */ + op->res = eb_chan_res_ok; + result = op_result_complete; + } + + eb_spinlock_unlock(&c->lock); + + if (signal_recv) { + port_list_signal_first(c->recvs, state->port); + } + } else { + result = op_result_retry; + } + } + + return result; +} + +static inline op_result recv_buf(const do_state *state, eb_chan_op *op, size_t op_idx) { + assert(state); + assert(op); + assert(op->chan); + + eb_chan c = op->chan; + op_result result = op_result_next; + + if (c->buf_len || c->state == chanstate_closed) { + if (eb_spinlock_try(&c->lock)) { + /* Sanity-check the channel's state */ + eb_assert_or_bail(c->state == chanstate_open || c->state == chanstate_closed, "Invalid channel state"); + + bool signal_send = false; + if (c->buf_len) { + /* ## Receiving, buffered, buffer non-empty */ + /* Notify the channel's sends if our buffer is going from full to not-full */ + signal_send = (c->buf_len == c->buf_cap); + /* Set our op's state and our return value */ + op->res = eb_chan_res_ok; + op->val = c->buf[c->buf_idx]; + result = op_result_complete; + /* Update chan's buffer. (Updating buf_idx needs to come after we use it!) */ + c->buf_len--; + c->buf_idx = (c->buf_idx + 1) % c->buf_cap; + } else if (c->state == chanstate_closed) { + /* ## Receiving, buffered, buffer empty, channel closed */ + /* Set our op's state and our return value */ + op->res = eb_chan_res_closed; + op->val = NULL; + result = op_result_complete; + } + + eb_spinlock_unlock(&c->lock); + + if (signal_send) { + port_list_signal_first(c->sends, state->port); + } + } else { + result = op_result_retry; + } + } + + return result; +} + +static inline op_result send_unbuf(const do_state *state, eb_chan_op *op, size_t op_idx) { + assert(state); + assert(op); + assert(op->chan); + + eb_chan c = op->chan; + op_result result = op_result_next; + + if ((c->state == chanstate_open && state->timeout != eb_nsec_zero) || + c->state == chanstate_closed || + (c->state == chanstate_send && c->unbuf_op == op) || + (c->state == chanstate_recv && c->unbuf_state != state) || + (c->state == chanstate_ack && c->unbuf_op == op)) { + + /* It looks like our channel's in an acceptable state, so try to acquire the lock */ + if (eb_spinlock_try(&c->lock)) { + /* Reset the cleanup state since we acquired the lock and are actually getting a look at the channel's state */ + state->cleanup_ops[op_idx] = false; + + bool signal_recv = false; + if (c->state == chanstate_open && state->timeout != eb_nsec_zero) { + c->state = chanstate_send; + c->unbuf_state = state; + c->unbuf_op = op; + c->unbuf_port = state->port; + /* We need to cleanup after this since we put it in the _send state! */ + state->cleanup_ops[op_idx] = true; + /* Signal a recv since one of them can continue now */ + signal_recv = true; + } else if (c->state == chanstate_closed) { + /* Set our op's state and our return value */ + op->res = eb_chan_res_closed; + result = op_result_complete; + } else if (c->state == chanstate_send && c->unbuf_op == op) { + /* We own the send op that's in progress, so assign chan's unbuf_port */ + /* Verify that the unbuf_state matches our 'id' parameter. If this assertion fails, it means there's likely + one eb_chan_op being shared by multiple threads, which isn't allowed. */ + eb_assert_or_bail(c->unbuf_state == state, "unbuf_state invalid"); + /* Assign the port */ + c->unbuf_port = state->port; + /* We need to cleanup after this since we put it in the _send state! */ + state->cleanup_ops[op_idx] = true; + } else if (c->state == chanstate_recv && c->unbuf_state != state) { + /* We verified (immediately above) that the recv isn't part of the same op pool (we can't do unbuffered + sends/recvs from the same _do() call) */ + + /* Sanity check -- make sure the op is a recv */ + eb_assert_or_bail(!c->unbuf_op->send, "Op isn't a recv as expected"); + + /* Set the recv op's value. This needs to happen before we transition out of the _recv state, otherwise the unbuf_op may no longer be valid! */ + c->unbuf_op->val = op->val; + /* Acknowledge the receive */ + c->state = chanstate_ack; + /* Get a reference to the unbuf_port that needs to be signaled */ + eb_port signal_port = (c->unbuf_port ? eb_port_retain(c->unbuf_port) : NULL); + eb_spinlock_unlock(&c->lock); + + /* Wake up the recv */ + if (signal_port) { + eb_port_signal(signal_port); + eb_port_release(signal_port); + signal_port = NULL; + } + + /* We have to cleanup all our ops here to cancel any outstanding unbuffered send/recvs, to avoid a deadlock + situation that arises when another _do() is waiting on our _do() to complete, but it never does because + we're about to wait for the other _do() to complete. */ + cleanup_ops(state); + + for (;;) { + if (*((volatile chanstate *)&c->state) != chanstate_ack) { + eb_spinlock_lock(&c->lock); + if (c->state == chanstate_done) { + /* Reset the channel state back to _open */ + c->state = chanstate_open; + /* We reset our state to _open, so signal a send since it can proceed now. */ + signal_recv = true; + /* Set our op's state and our return value */ + op->res = eb_chan_res_ok; + result = op_result_complete; + /* Breaking here so that we skip the _unlock() call, because we unlock the spinlock outside + of our large if-statement. */ + break; + } else if (c->state == chanstate_cancelled) { + /* Reset the channel state back to _open */ + c->state = chanstate_open; + /* As long as we're not polling, we should try the op again */ + if (state->timeout != eb_nsec_zero) { + result = op_result_retry; + } else { + /* We're not telling the caller to retry, so signal a send since it can proceed now. */ + signal_recv = true; + } + /* Breaking here so that we skip the _unlock() call, because we unlock the spinlock outside + of our large if-statement. */ + break; + } + eb_spinlock_unlock(&c->lock); + } else if (eb_sys_ncores == 1) { + /* On uniprocessor machines, yield to the scheduler because we can't continue until another + thread updates the channel's state. */ + sched_yield(); + } + } + } else if (c->state == chanstate_ack && c->unbuf_op == op) { + /* A recv acknowledged our send! */ + /* Verify that the unbuf_state matches our 'id' parameter. If this assertion fails, it means there's likely + one eb_chan_op being shared by multiple threads, which isn't allowed. */ + eb_assert_or_bail(c->unbuf_state == state, "unbuf_state invalid"); + /* A recv is polling for chan's state to change, so update it to signal that we're done sending! */ + c->state = chanstate_done; + /* Set our op's state and our return value */ + op->res = eb_chan_res_ok; + result = op_result_complete; + } + + eb_spinlock_unlock(&c->lock); + + if (signal_recv) { + port_list_signal_first(c->recvs, state->port); + } + } else { + result = op_result_retry; + } + } + + return result; +} + +static inline op_result recv_unbuf(const do_state *state, eb_chan_op *op, size_t op_idx) { + assert(state); + assert(op); + assert(op->chan); + + eb_chan c = op->chan; + op_result result = op_result_next; + + if ((c->state == chanstate_open && state->timeout != eb_nsec_zero) || + c->state == chanstate_closed || + (c->state == chanstate_send && c->unbuf_state != state) || + (c->state == chanstate_recv && c->unbuf_op == op) || + (c->state == chanstate_ack && c->unbuf_op == op)) { + + /* It looks like our channel's in an acceptable state, so try to acquire the lock */ + if (eb_spinlock_try(&c->lock)) { + /* Reset the cleanup state since we acquired the lock and are actually getting a look at the channel's state */ + state->cleanup_ops[op_idx] = false; + + bool signal_send = false; + if (c->state == chanstate_open && state->timeout != eb_nsec_zero) { + c->state = chanstate_recv; + c->unbuf_state = state; + c->unbuf_op = op; + c->unbuf_port = state->port; + /* We need to cleanup after this since we put it in the _send state! */ + state->cleanup_ops[op_idx] = true; + /* Signal a send since one of them can continue now */ + signal_send = true; + } else if (c->state == chanstate_closed) { + /* Set our op's state and our return value */ + op->res = eb_chan_res_closed; + op->val = NULL; + result = op_result_complete; + } else if (c->state == chanstate_send && c->unbuf_state != state) { + /* We verified (immediately above) that the send isn't part of the same op pool (we can't do unbuffered + sends/recvs from the same _do() call) */ + + /* Sanity check -- make sure the op is a send */ + eb_assert_or_bail(c->unbuf_op->send, "Op isn't a send as expected"); + + /* Get the op's value. This needs to happen before we transition out of the _send state, otherwise the unbuf_op may no longer be valid! */ + op->val = c->unbuf_op->val; + /* Acknowledge the send */ + c->state = chanstate_ack; + /* Get a reference to the unbuf_port that needs to be signaled */ + eb_port signal_port = (c->unbuf_port ? eb_port_retain(c->unbuf_port) : NULL); + eb_spinlock_unlock(&c->lock); + + /* Wake up the send */ + if (signal_port) { + eb_port_signal(signal_port); + eb_port_release(signal_port); + signal_port = NULL; + } + + /* We have to cleanup all our ops here to cancel any outstanding unbuffered send/recvs, to avoid a deadlock + situation that arises when another _do() is waiting on our _do() to complete, but it never does because + we're about to wait for the other _do() to complete. */ + cleanup_ops(state); + + for (;;) { + if (*((volatile chanstate *)&c->state) != chanstate_ack) { + eb_spinlock_lock(&c->lock); + if (c->state == chanstate_done) { + /* Reset the channel state back to _open */ + c->state = chanstate_open; + /* We reset our state to _open, so signal a recv since it can proceed now. */ + signal_send = true; + /* Set our op's state and our return value */ + op->res = eb_chan_res_ok; + result = op_result_complete; + /* Breaking here so that we skip the _unlock() call, because we unlock the spinlock outside + of our large if-statement. */ + break; + } else if (c->state == chanstate_cancelled) { + /* Reset the channel state back to _open */ + c->state = chanstate_open; + /* As long as we're not polling, we should try the op again */ + if (state->timeout != eb_nsec_zero) { + result = op_result_retry; + } else { + /* We're not telling the caller to retry, so signal a recv since it can proceed now. */ + signal_send = true; + } + /* Breaking here so that we skip the _unlock() call, because we unlock the spinlock outside + of our large if-statement. */ + break; + } + eb_spinlock_unlock(&c->lock); + } else if (eb_sys_ncores == 1) { + /* On uniprocessor machines, yield to the scheduler because we can't continue until another + thread updates the channel's state. */ + sched_yield(); + } + } + } else if (c->state == chanstate_recv && c->unbuf_op == op) { + /* We own the recv op that's in progress, so assign chan's unbuf_port */ + /* Verify that the _recv_id matches our 'id' parameter. If this assertion fails, it means there's likely + one eb_chan_op being shared by multiple threads, which isn't allowed. */ + eb_assert_or_bail(c->unbuf_state == state, "unbuf_state invalid"); + /* Assign the port */ + c->unbuf_port = state->port; + /* We need to cleanup after this since we put it in the _send state! */ + state->cleanup_ops[op_idx] = true; + } else if (c->state == chanstate_ack && c->unbuf_op == op) { + /* A send acknowledged our recv! */ + /* Verify that the unbuf_state matches our 'id' parameter. If this assertion fails, it means there's likely + one eb_chan_op being shared by multiple threads, which isn't allowed. */ + eb_assert_or_bail(c->unbuf_state == state, "unbuf_state invalid"); + /* A send is polling for chan's state to change, so update it to signal that we're done sending! */ + c->state = chanstate_done; + /* Set our op's state and our return value */ + op->res = eb_chan_res_ok; + result = op_result_complete; + } + + eb_spinlock_unlock(&c->lock); + + if (signal_send) { + port_list_signal_first(c->sends, state->port); + } + } else { + result = op_result_retry; + } + } + + return result; +} + +static inline op_result try_op(const do_state *state, eb_chan_op *op, size_t op_idx) { + assert(state); + assert(op); + + eb_chan c = op->chan; + if (c) { + if (op->send) { + /* ## Send */ + return (c->buf_cap ? send_buf(state, op, op_idx) : send_unbuf(state, op, op_idx)); + } else { + /* ## Receive */ + return (c->buf_cap ? recv_buf(state, op, op_idx) : recv_unbuf(state, op, op_idx)); + } + } + return op_result_next; +} + +eb_chan_res eb_chan_send(eb_chan c, const void *val) { + eb_chan_op op = eb_chan_op_send(c, val); + eb_assert_or_bail(eb_chan_select(eb_nsec_forever, &op) == &op, "Invalid select() return value"); + return op.res; +} + +eb_chan_res eb_chan_try_send(eb_chan c, const void *val) { + eb_chan_op op = eb_chan_op_send(c, val); + eb_chan_op *r = eb_chan_select(eb_nsec_zero, &op); + eb_assert_or_bail(r == NULL || r == &op, "Invalid select() return value"); + return (r ? op.res : eb_chan_res_stalled); +} + +eb_chan_res eb_chan_recv(eb_chan c, const void **val) { + eb_chan_op op = eb_chan_op_recv(c); + eb_assert_or_bail(eb_chan_select(eb_nsec_forever, &op) == &op, "Invalid select() return value"); + if (op.res == eb_chan_res_ok && val) { + *val = op.val; + } + return op.res; +} + +eb_chan_res eb_chan_try_recv(eb_chan c, const void **val) { + eb_chan_op op = eb_chan_op_recv(c); + eb_chan_op *r = eb_chan_select(eb_nsec_zero, &op); + eb_assert_or_bail(r == NULL || r == &op, "Invalid select() return value"); + if (r && op.res == eb_chan_res_ok && val) { + *val = op.val; + } + return (r ? op.res : eb_chan_res_stalled); +} + +#pragma mark - Multiplexing - +#define next_idx(nops, delta, idx) (delta == 1 && idx == nops-1 ? 0 : ((delta == -1 && idx == 0) ? nops-1 : idx+delta)) +eb_chan_op *eb_chan_select_list(eb_nsec timeout, eb_chan_op *const ops[], size_t nops) { + assert(!nops || ops); + + const size_t k_attempt_multiplier = (eb_sys_ncores == 1 ? 1 : 500); + eb_nsec start_time = 0; + size_t idx_start = 0; + int8_t idx_delta = 0; + if (nops > 1) { + /* Assign idx_start/idx_delta, which control the op pseudo-randomization */ + start_time = eb_time_now(); + idx_start = (start_time/1000)%nops; + idx_delta = (!((start_time/10000)%2) ? 1 : -1); + } + + bool co[nops]; + memset(co, 0, sizeof(co)); + + eb_chan_op *result = NULL; + do_state state = { + .ops = ops, + .nops = nops, + .cleanup_ops = co, + .timeout = timeout, + .port = NULL}; + + if (timeout == eb_nsec_zero) { + /* ## timeout == 0: try every op exactly once; if none of them can proceed, return NULL. */ + for (size_t i = 0, idx = idx_start; i < nops; i++, idx = next_idx(nops, idx_delta, idx)) { + eb_chan_op *op = ops[idx]; + op_result r; + while ((r = try_op(&state, op, idx)) == op_result_retry) { + if (eb_sys_ncores == 1) { + /* On uniprocessor machines, yield to the scheduler because we can't continue until another + thread updates the channel's state. */ + sched_yield(); + } + } + + /* If the op completed, we need to exit! */ + if (r == op_result_complete) { + result = op; + goto cleanup; + } + } + } else { + /* ## timeout != 0 */ + if (timeout != eb_nsec_forever && !start_time) { + start_time = eb_time_now(); + } + + for (;;) { + /* ## Fast path: loop over our operations to see if one of them was able to send/receive. (If not, + we'll enter the slow path where we put our thread to sleep until we're signaled.) */ + for (size_t i = 0, idx = idx_start; i < k_attempt_multiplier*nops; i++, idx = next_idx(nops, idx_delta, idx)) { + eb_chan_op *op = ops[idx]; + op_result r = try_op(&state, op, idx); + /* If the op completed, we need to exit! */ + if (r == op_result_complete) { + result = op; + goto cleanup; + } + } + + /* ## Slow path: we weren't able to find an operation that could send/receive, so we'll create a + port to receive notifications on and put this thread to sleep until someone wakes us up. */ + if (!state.port) { + /* Create our port that we'll attach to channels so that we can be notified when events occur. */ + state.port = eb_port_create(); + eb_assert_or_recover(state.port, goto cleanup); + + /* Register our port for the appropriate notifications on every channel. */ + /* This adds 'port' to the channel's sends/recvs (depending on the op), which we clean up at the + end of this function. */ + for (size_t i = 0; i < nops; i++) { + eb_chan_op *op = ops[i]; + eb_chan c = op->chan; + if (c) { + port_list_add((op->send ? c->sends : c->recvs), state.port); + } + } + } + + /* Before we go to sleep, call try_op() for every op until we get a non-busy return value. This way we'll ensure + that no op is actually able to be performed, and we'll also ensure that 'port' is registered as the 'unbuf_port' + for the necessary channels. */ + for (size_t i = 0, idx = idx_start; i < nops; i++, idx = next_idx(nops, idx_delta, idx)) { + eb_chan_op *op = ops[idx]; + op_result r; + while ((r = try_op(&state, op, idx)) == op_result_retry) { + if (eb_sys_ncores == 1) { + /* On uniprocessor machines, yield to the scheduler because we can't continue until another + thread updates the channel's state. */ + sched_yield(); + } + } + + /* If the op completed, we need to exit! */ + if (r == op_result_complete) { + result = op; + goto cleanup; + } + } + + eb_nsec wait_timeout = eb_nsec_forever; + if (timeout != eb_nsec_forever) { + /* If we have a timeout, determine how much time has elapsed, because we may have timed-out. */ + eb_nsec elapsed = eb_time_now() - start_time; + /* Check if we timed-out */ + if (elapsed < timeout) { + wait_timeout = timeout - elapsed; + } else { + goto cleanup; + } + } + + /* Put our thread to sleep until someone alerts us of an event */ + eb_port_wait(state.port, wait_timeout); + } + } + + /* Cleanup! */ + cleanup: { + if (state.port) { + for (size_t i = 0; i < nops; i++) { + eb_chan_op *op = ops[i]; + eb_chan c = op->chan; + if (c) { + port_list ports = (op->send ? c->sends : c->recvs); + port_list_rm(ports, state.port); + port_list_signal_first(ports, state.port); + } + } + } + + cleanup_ops(&state); + + if (state.port) { + eb_port_release(state.port); + state.port = NULL; + } + } + + return result; +} +#endif /* EB_CHAN_H */ \ No newline at end of file diff --git a/event/goEvent.h b/event/goEvent.h index 7c47aad..1c8a7c3 100644 --- a/event/goEvent.h +++ b/event/goEvent.h @@ -15,22 +15,66 @@ #include #include "pub.h" -#include "../chan/src/chan.h" +//#include "../chan/src/chan.h" +#include "../chan/eb_chan.h" +//chan_t * events = NULL; +eb_chan events; -chan_t * events = NULL; +void go_send(char*); +void go_sleep(void); + +bool sending = false; + +void startev(){ +// events = chan_init(1024); + events = eb_chan_create(1024); + eb_chan_retain(events); + sending = true; + add_event("q"); +} + +//bool done = false; +void pollEv(){ + if(events == NULL) return; + for(;eb_chan_buf_len(events)!=0;){ + char* tmp; + if(eb_chan_try_recv(events,(const void**) &tmp) == eb_chan_res_ok){ + go_send(tmp); + free(tmp); + }else{ + // + } + } +} + +void endPoll(){ + sending = false; + pollEv();//remove last things from channel + eb_chan_release(events); +} void dispatch_proc(iohook_event * const event) { - char buffer[256] = { 0 }; - size_t length = snprintf(buffer, sizeof(buffer), - "{id:%i,when:%" PRIu64 ",mask=0x%X", - event->type, event->time, event->mask); + if(!sending) return; +//leaking memory? hope not + char* buffer = calloc(200,sizeof(char)); +// char buffer[256] = { 0 }; +// size_t length = snprintf(buffer, sizeof(buffer), +// "{id:%i,when:%" PRIu64 ",mask=0x%X", +// event->type, event->time, event->mask); switch (event->type) { + case EVENT_HOOK_ENABLED: + case EVENT_HOOK_DISABLED: + sprintf(buffer,"{\"id\":%i,\"time\":%" PRIu64 ",\"mask\":%hu,\"reserved\":%hu}", + event->type, event->time, event->mask,event->reserved); +// fprintf(stdout,"hook enabled"); + break;//send it? case EVENT_KEY_PRESSED: case EVENT_KEY_RELEASED: case EVENT_KEY_TYPED: - length = snprintf(buffer + length, sizeof(buffer) - length, - ",keycode:%hu,rawcode:%hu,keychar:%hu}", + sprintf(buffer, + "{\"id\":%i,\"time\":%" PRIu64 ",\"mask\":%hu,\"reserved\":%hu,\"keycode\":%hu,\"rawcode\":%hu,\"keychar\":%hu}", + event->type, event->time, event->mask,event->reserved, event->data.keyboard.keycode, event->data.keyboard.rawcode, event->data.keyboard.keychar); @@ -40,16 +84,18 @@ void dispatch_proc(iohook_event * const event) { case EVENT_MOUSE_CLICKED: case EVENT_MOUSE_MOVED: case EVENT_MOUSE_DRAGGED: - snprintf(buffer + length, sizeof(buffer) - length, - ",x:%u,y:%u,button:%u,clicks:%u}", + sprintf(buffer, + "{\"id\":%i,\"time\":%" PRIu64 ",\"mask\":%hu,\"reserved\":%hu,\"x\":%hd,\"y\":%hd,\"button\":%u,\"clicks\":%u}", + event->type, event->time, event->mask,event->reserved, event->data.mouse.x, event->data.mouse.y, event->data.mouse.button, event->data.mouse.clicks); break; case EVENT_MOUSE_WHEEL: - snprintf(buffer + length, sizeof(buffer) - length, - ",clicks:%hu,x:%hd,y:%hd,type:%hu,ammount:%hu,rotation:%hd,direction:%hu}", + sprintf(buffer, + "{\"id\":%i,\"time\":%" PRIu64 ",\"mask\":%hu,\"reserved\":%hu,\"clicks\":%hu,\"x\":%hd,\"y\":%hd,\"type\":%hu,\"ammount\":%hu,\"rotation\":%hd,\"direction\":%hu}", + event->type, event->time, event->mask, event->reserved, event->data.wheel.clicks, event->data.wheel.x, event->data.wheel.y, @@ -59,21 +105,29 @@ void dispatch_proc(iohook_event * const event) { event->data.wheel.direction); break; default: - fprintf(stderr,"\nError on file: %s, unusual event->type\n",__FILE__); + fprintf(stderr,"\nError on file: %s, unusual event->type: %i\n",__FILE__,event->type); return; } //to-do remove this for for(int i = 0; i < 5; i++){ - switch(chan_select(NULL,0,NULL,&events,1,(void**) &buffer)){ - case 0: - fprintf(stdout,"\nitem sent: %s",buffer); + switch(eb_chan_try_send(events,buffer)){ +// switch(chan_select(NULL,0,NULL,&events,1,(void**) &buffer)){ + case eb_chan_res_ok: +// case 0: +// fprintf(stdout,"\nlen:%i,item sent: %s",chan_size(events),buffer); +// fprintf(stdout,"\nlen:%i,item sent: %s",eb_chan_buf_len(events),buffer); + i=5; break; default: - chan_dispose(events); - fprintf(stdout,"\n%i",i); + if (i == 4) {//let's not leak memory + free(buffer); + } +// chan_dispose(events); +// fprintf(stdout,"%i",i); continue; } } + // fprintf(stdout, "----%s\n", buffer); } diff --git a/extern.go b/extern.go index 82405cc..ffe1c70 100644 --- a/extern.go +++ b/extern.go @@ -6,16 +6,32 @@ package hook // #include "event/hook_async.h" */ import "C" -import "time" +import ( + "encoding/json" + "fmt" + "log" + "time" +) //export go_send func go_send(s *C.char) { - str := C.GoString(s) - ev <- str + str := []byte(C.GoString(s)) + fmt.Println(string(str)) + out := Event{} + err := json.Unmarshal(str, &out) + if err != nil{ + log.Fatal(err) + } + out.When = time.Now() //at least it's consistent + if err != nil { + log.Fatal(err) + } + //todo: maybe make non-bloking + ev <- out } -//export go_sleep -func go_sleep(){ - //todo: find smallest time that does not destroy the cpu utilization - time.Sleep(time.Millisecond*50) -} \ No newline at end of file +////export go_sleep +//func go_sleep(){ +// +// time.Sleep(time.Millisecond*50) +//} diff --git a/hook.go b/hook.go index 92132ae..157c476 100644 --- a/hook.go +++ b/hook.go @@ -20,64 +20,79 @@ package hook //#cgo windows LDFLAGS: -lgdi32 -luser32 // #include "event/hook_async.h" -#include "chan/src/chan.h" #include "event/goEvent.h" -void go_send(char*); -void go_sleep(void); -extern chan_t* events; - -void startev(){ - events = chan_init(1024); - add_event("q"); -} -bool done = false; -void pollEv(){ - while(!done){ - for(int i=chan_size(events); i >0;i--){ - char* tmp; - chan_recv(events,(void**) &tmp); - go_send(tmp); - } - //go_sleep(); - } -} - -void endPoll(){ - done = true; -} - */ import "C" import ( - // "fmt" - "unsafe" + "time" ) -var ev chan string = make(chan string,128) +//todo: add enums +const ( + HOOK_ENABLED = 1 //iota + HOOK_DISABLED = 2 + KEY_TYPED = 3 + KEY_PRESSED = 4 + KEY_RELEASED = 5 + MOUSE_CLICKED = 6 + MOUSE_PRESSED = 7 + MOUSE_RELEASED = 8 + MOUSE_MOVED = 9 + MOUSE_DRAGGED = 10 + MOUSE_WHEEL = 11 +) -// AddEvent add event listener -func AddEvent(key string) int { - cs := C.CString(key) - defer C.free(unsafe.Pointer(cs)) - - eve := C.add_event(cs) - geve := int(eve) - - return geve +type Event struct { + Kind uint8 `json:"id"` + When time.Time + Mask uint16 `json:"mask"` + Reserved uint16 `json:"reserved"` + Keycode uint16 `json:"keycode"` + Rawcode uint16 `json:"rawcode"` + Keychar uint16 `json:"keychar"` + Button uint16 `json:"button"` + Clicks uint16 `json:"clicks"` + X int16 `json:"x"` + Y int16 `json:"y"` + Ammount uint16 `json:"ammount"` + Rotation int16 `json:"rotation"` + Direction uint8 `json:"direction"` } -func StartEvent() chan string{ - C.startev() - go C.pollEv() +var ( + ev chan Event = make(chan Event, 1024) + asyncon bool = false +) + +func Start() chan Event { + //fmt.Print("Here1") + asyncon = true + go C.startev() + go func() { + for { + C.pollEv() + time.Sleep(time.Millisecond * 50) + //todo: find smallest time that does not destroy the cpu utilization + //fmt.Println("_here_") + if ! asyncon { + return + } + } + //fmt.Print("WOOOOOOOOOT") + }() + //fmt.Print("Here2") return ev } - // StopEvent stop event listener -func StopEvent() { +func End() { C.endPoll() C.stop_event() - C.chan_close(C.events); + for len(ev) != 0 { + <-ev + } + asyncon = false + //C.chan_close(C.events); } diff --git a/test/main.go b/test/main.go index 7bb745d..590f714 100644 --- a/test/main.go +++ b/test/main.go @@ -2,22 +2,28 @@ package main import ( "fmt" + "time" - "github.com/robotn/gohook" + "github.com/cauefcr/gohook" ) func main() { - s := hook.StartEvent() - - go func() { - fmt.Print("woo!") - for i:=range s { - fmt.Println(i) + s := hook.Start() + tout := time.After(time.Second * 10) + done := false + for !done { + select { + case i := <-s: + if i.Keychar == uint16('q') { + tout = time.After(1 * time.Millisecond) + } + fmt.Printf("%+v\n", i) + case <-tout: + fmt.Print("Done.") + done = true + break; } - }() - // hook.AsyncHook() - veve := hook.AddEvent("v") - if veve == 0 { - fmt.Println("v...") } + hook.End() + }