2 #include <aura/private.h>
6 static void *aura_eventsys_get_autocreate(
struct aura_node *node)
10 slog(3, SLOG_DEBUG,
"aura: Auto-creating eventsystem for node");
13 slog(0, SLOG_ERROR,
"aura: eventloop auto-creation failed");
16 loop->autocreated = 1;
17 aura_eventloop_set_data(node, loop);
34 struct aura_node *
aura_open(
const char *name,
const char *opts)
36 struct aura_node *node = calloc(1,
sizeof(*node));
40 node->is_opening =
true;
42 node->poll_timeout = 250;
43 node->tr = aura_transport_lookup(name);
45 slog(0, SLOG_FATAL,
"Invalid transport name: %s", name);
49 INIT_LIST_HEAD(&node->outbound_buffers);
50 INIT_LIST_HEAD(&node->inbound_buffers);
51 INIT_LIST_HEAD(&node->event_buffers);
52 INIT_LIST_HEAD(&node->buffer_pool);
53 node->gc_threshold = 10;
55 node->status = AURA_STATUS_OFFLINE;
62 ret = node->tr->open(node, opts);
67 slog(6, SLOG_LIVE,
"Created a node using transport: %s", name);
68 node->is_opening =
false;
72 slog(0, SLOG_FATAL,
"Error opening transport: %s", name);
78 static void cleanup_buffer_queue(
struct list_head *q,
bool destroy)
83 list_for_each_safe(pos, tmp, q) {
93 slog(6, SLOG_LIVE,
"Cleaned up %d buffers", i);
107 node->tr->close(node);
110 if (loop->autocreated)
118 cleanup_buffer_queue(&node->inbound_buffers,
true);
119 cleanup_buffer_queue(&node->outbound_buffers,
true);
120 cleanup_buffer_queue(&node->buffer_pool,
true);
121 aura_transport_release(node->tr);
124 aura_etable_destroy(node->tbl);
131 slog(6, SLOG_LIVE,
"Transport closed");
140 static void aura_handle_inbound(
struct aura_node *node)
144 struct aura_object *o;
151 node->current_object = o;
154 slog(4, SLOG_DEBUG,
"Handling %s id %d (%s) sync_call_running=%d",
155 object_is_method(o) ?
"response" :
"event",
156 o->id, o->name, node->sync_call_running);
158 if (object_is_method(o) && !o->pending) {
159 slog(0, SLOG_WARN,
"Dropping orphan call result %d (%s)",
162 }
else if (o->calldonecb) {
163 slog(4, SLOG_DEBUG,
"Callback for method/event %d (%s)",
165 o->calldonecb(node, AURA_CALL_COMPLETED, buf, o->arg);
167 }
else if (object_is_method(o) && (node->sync_call_running)) {
168 slog(4, SLOG_DEBUG,
"Completing call for method %d (%s)",
170 node->sync_call_result = AURA_CALL_COMPLETED;
171 node->sync_ret_buf = buf;
174 BUG(node,
"Internal BUG: pending evt count lesser than zero");
177 if (node->sync_event_max > 0) {
179 if (node->sync_event_max <= node->sync_event_count) {
181 const struct aura_object *dummy;
184 BUG(node,
"Internal bug, no next event");
190 node->sync_event_count++;
191 slog(4, SLOG_DEBUG,
"Queued event %d (%s) for sync readout",
195 if (node->unhandled_evt_cb)
196 node->unhandled_evt_cb(node, buf, node->unhandled_evt_arg);
198 slog(0, SLOG_WARN,
"Dropping event %d (%s)",
205 node->current_object = NULL;
229 if (!node->current_object) {
230 slog(0, SLOG_WARN,
"Looks like you're calling aura_get_current_object() outside the callback");
231 slog(0, SLOG_WARN,
"Don't do that - read the docs!");
233 return node->current_object;
245 return node->eventsys_data;
259 void (*cb)(
struct aura_node *node,
int newstatus,
void *arg),
262 node->status_changed_arg = arg;
263 node->status_changed_cb = cb;
276 void (*cb)(
const struct aura_pollfds *fd,
enum aura_fd_action act,
void *arg),
280 const struct aura_pollfds *fds;
281 int count = aura_get_pollfds(node, &fds);
282 node->fd_changed_arg = arg;
283 node->fd_changed_cb = cb;
284 if (node->fd_changed_cb)
285 for (i=0; i<count; i++)
286 node->fd_changed_cb(&fds[i], AURA_FD_ADDED, node->fd_changed_arg);
296 void (*cb)(
struct aura_node *node,
297 struct aura_export_table *old,
298 struct aura_export_table *
new,
302 node->etable_changed_arg = arg;
303 node->etable_changed_cb = cb;
315 void (*cb)(
struct aura_node *node,
320 node->unhandled_evt_cb = cb;
321 node->unhandled_evt_arg = arg;
340 void (*cb)(
struct aura_node *node,
341 struct aura_object *failed,
345 node->object_migration_failed_cb = cb;
346 node->object_migration_failed_arg = arg;
362 struct aura_object *o,
363 void (*calldonecb)(
struct aura_node *dev,
int status,
struct aura_buffer *ret,
void *arg),
367 struct aura_eventloop *loop = aura_eventsys_get_autocreate(node);
373 if(node->status != AURA_STATUS_ONLINE)
380 BUG(node,
"Node has no assosiated event system. Fix your code!");
382 isfirst = list_empty(&node->outbound_buffers);
384 o->calldonecb = calldonecb;
390 slog(4, SLOG_DEBUG,
"Queued call for id %d (%s), notifying node", o->id, o->name);
393 slog(4, SLOG_DEBUG,
"Notifying transport of queue status change");
394 node->last_checked = 0;
395 aura_eventloop_interrupt(loop);
413 struct aura_node *node,
414 struct aura_object *o,
419 struct aura_eventloop *loop = aura_eventsys_get_autocreate(node);
421 if (node->sync_call_running)
422 BUG(node,
"Internal bug: Synchronos call within a synchronos call");
424 node->sync_call_running =
true;
427 node->sync_call_result = ret;
435 slog(4, SLOG_DEBUG,
"Call completed");
436 *retbuf = node->sync_ret_buf;
439 node->sync_call_running =
false;
440 return node->sync_call_result;
463 struct aura_node *node,
465 void (*calldonecb)(
struct aura_node *dev,
int status,
struct aura_buffer *ret,
void *arg),
473 struct aura_object *o = aura_etable_find_id(node->tbl,
id);
478 buf = aura_serialize(node, o->arg_fmt, o->arglen, ap);
506 struct aura_node *node,
508 void (*calldonecb)(
struct aura_node *dev,
int status,
struct aura_buffer *ret,
void *arg),
511 struct aura_object *o = aura_etable_find_id(node->tbl,
id);
515 if (!object_is_event(o))
518 o->calldonecb = calldonecb;
537 struct aura_node *node,
539 void (*calldonecb)(
struct aura_node *dev,
int status,
struct aura_buffer *ret,
void *arg),
542 struct aura_object *o = aura_etable_find(node->tbl, event);
546 if (!object_is_event(o))
549 o->calldonecb = calldonecb;
565 struct aura_node *node,
567 void (*calldonecb)(
struct aura_node *dev,
int status,
struct aura_buffer *ret,
void *arg),
571 struct aura_object *o;
576 o = aura_etable_find(node->tbl, name);
581 buf = aura_serialize(node, o->arg_fmt, o->arglen, ap);
609 struct aura_eventloop *loop = aura_eventsys_get_autocreate(node);
610 while (node->status != status)
627 struct aura_node *node,
635 struct aura_object *o = aura_etable_find_id(node->tbl,
id);
637 if (node->sync_call_running)
638 BUG(node,
"Internal bug: Synchronos call within a synchronos call");
643 va_start(ap, retbuf);
644 buf = aura_serialize(node, o->arg_fmt, o->arglen, ap);
648 slog(2, SLOG_WARN,
"Serialization failed");
667 struct aura_node *node,
674 struct aura_object *o = aura_etable_find(node->tbl, name);
679 va_start(ap, retbuf);
680 buf = aura_serialize(node, o->arg_fmt, o->arglen, ap);
684 slog(2, SLOG_WARN,
"Serialization failed");
713 while(node->sync_event_max >= count) {
714 const struct aura_object *o;
718 BUG(node,
"Internal bug while resizing event queue (failed to drop some events)");
721 node->sync_event_max = count;
732 return node->sync_event_count;
755 struct aura_eventloop *loop = aura_eventsys_get_autocreate(node);
757 while (!node->sync_event_count) {
765 *obj = (
const struct aura_object *)(*retbuf)->object;
766 node->sync_event_count--;
791 if (o->pending && o->calldonecb)
792 o->calldonecb(node, AURA_CALL_TRANSPORT_FAIL, NULL, o->arg);
796 node->sync_call_result = AURA_CALL_TRANSPORT_FAIL;
797 node->sync_ret_buf = NULL;
810 int oldstatus = node->status;
811 node->status = status;
813 if (oldstatus == status)
816 if (node->is_opening)
817 BUG(node,
"Transport BUG: Do not call aura_set_status in open()");
819 if ((oldstatus == AURA_STATUS_OFFLINE) && (status == AURA_STATUS_ONLINE)) {
822 slog(2, SLOG_INFO,
"Node %s is now going online", node->tr->name);
823 slog(2, SLOG_INFO,
"--- Dumping export table ---");
824 for (i=0; i< node->tbl->next; i++) {
825 slog(2, SLOG_INFO,
"%d. %s %s %s(%s ) [out %d bytes] | [in %d bytes] ",
826 node->tbl->objects[i].id,
827 object_is_method((&node->tbl->objects[i])) ?
"METHOD" :
"EVENT ",
828 node->tbl->objects[i].ret_pprinted,
829 node->tbl->objects[i].name,
830 node->tbl->objects[i].arg_pprinted,
831 node->tbl->objects[i].arglen,
832 node->tbl->objects[i].retlen);
834 slog(1, SLOG_INFO,
"-------------8<-------------");
836 if ((oldstatus == AURA_STATUS_ONLINE) && (status == AURA_STATUS_OFFLINE)) {
839 slog(2, SLOG_INFO,
"Node %s going offline, clearing outbound queue",
841 cleanup_buffer_queue(&node->outbound_buffers,
false);
843 aura_handle_inbound(node);
845 for (i=0; i < node->tbl->next; i++) {
846 struct aura_object *o;
847 o=&node->tbl->objects[i];
848 if (o->pending && o->calldonecb)
849 o->calldonecb(node, AURA_CALL_TRANSPORT_FAIL, NULL, o->arg);
854 node->sync_call_result = AURA_CALL_TRANSPORT_FAIL;
855 node->sync_ret_buf = NULL;
858 if (node->status_changed_cb)
859 node->status_changed_cb(node, status, node->status_changed_arg);
872 if (aura_get_host_endianness() != en)
873 node->need_endian_swap =
true;
886 void aura_process_node_event(
struct aura_node *node,
const struct aura_pollfds *fd)
888 if (fd && node->tr->loop)
889 node->tr->loop(node, fd);
891 uint64_t curtime = aura_platform_timestamp();
893 if ((curtime - node->last_checked > node->poll_timeout) && node->tr->loop) {
894 node->tr->loop(node, NULL);
895 node->last_checked = curtime;
899 aura_handle_inbound(node);
902 void aura_eventloop_set_data(
struct aura_node *node,
struct aura_eventloop *data)
904 node->eventsys_data = data;
int aura_get_next_event(struct aura_node *node, const struct aura_object **obj, struct aura_buffer **retbuf)
void aura_set_status(struct aura_node *node, int status)
struct aura_buffer * aura_dequeue_buffer(struct list_head *head)
int aura_core_call(struct aura_node *node, struct aura_object *o, struct aura_buffer **retbuf, struct aura_buffer *argbuf)
#define aura_eventloop_create(...)
void aura_wait_status(struct aura_node *node, int status)
struct aura_eventloop * aura_eventloop_get_data(struct aura_node *node)
static void aura_buffer_rewind(struct aura_buffer *buf)
void aura_object_migration_failed_cb(struct aura_node *node, void(*cb)(struct aura_node *node, struct aura_object *failed, void *arg), void *arg)
int aura_set_event_callback_raw(struct aura_node *node, int id, void(*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg), void *arg)
void aura_handle_events(struct aura_eventloop *loop)
int aura_call_raw(struct aura_node *node, int id, struct aura_buffer **retbuf,...)
void aura_eventloop_destroy(struct aura_eventloop *loop)
int aura_get_pending_events(struct aura_node *node)
int aura_call(struct aura_node *node, const char *name, struct aura_buffer **retbuf,...)
struct aura_node * aura_open(const char *name, const char *opts)
void aura_fd_changed_cb(struct aura_node *node, void(*cb)(const struct aura_pollfds *fd, enum aura_fd_action act, void *arg), void *arg)
void aura_call_fail(struct aura_node *node, struct aura_object *o)
int aura_start_call_raw(struct aura_node *node, int id, void(*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg), void *arg,...)
void aura_eventloop_del(struct aura_node *node)
void aura_queue_buffer(struct list_head *list, struct aura_buffer *buf)
int aura_core_start_call(struct aura_node *node, struct aura_object *o, void(*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg), void *arg, struct aura_buffer *buf)
void aura_enable_sync_events(struct aura_node *node, int count)
const struct aura_object * aura_get_current_object(struct aura_node *node)
Obtain the pointer to the current aura_object.
void aura_status_changed_cb(struct aura_node *node, void(*cb)(struct aura_node *node, int newstatus, void *arg), void *arg)
void aura_etable_changed_cb(struct aura_node *node, void(*cb)(struct aura_node *node, struct aura_export_table *old, struct aura_export_table *new, void *arg), void *arg)
void aura_unhandled_evt_cb(struct aura_node *node, void(*cb)(struct aura_node *node, struct aura_buffer *buf, void *arg), void *arg)
void aura_buffer_release(struct aura_buffer *buf)
void aura_close(struct aura_node *node)
void aura_set_node_endian(struct aura_node *node, enum aura_endianness en)
void aura_buffer_destroy(struct aura_buffer *buf)
int aura_start_call(struct aura_node *node, const char *name, void(*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg), void *arg,...)
struct aura_object * object
int aura_set_event_callback(struct aura_node *node, const char *event, void(*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg), void *arg)