aura  0.1
 All Data Structures Functions Variables Modules Pages
aura.c
1 #include <aura/aura.h>
2 #include <aura/private.h>
3 #include <inttypes.h>
4 
5 
6 static void *aura_eventsys_get_autocreate(struct aura_node *node)
7 {
8  struct aura_eventloop *loop = aura_eventloop_get_data(node);
9  if (loop == NULL) {
10  slog(3, SLOG_DEBUG, "aura: Auto-creating eventsystem for node");
11  loop = aura_eventloop_create(node);
12  if (!loop) {
13  slog(0, SLOG_ERROR, "aura: eventloop auto-creation failed");
14  aura_panic(node);
15  }
16  loop->autocreated = 1;
17  aura_eventloop_set_data(node, loop);
18  }
19  return loop;
20 }
21 
34 struct aura_node *aura_open(const char *name, const char *opts)
35 {
36  struct aura_node *node = calloc(1, sizeof(*node));
37  int ret = 0;
38  if (!node)
39  return NULL;
40  node->is_opening = true;
41 
42  node->poll_timeout = 250; /* 250 ms default */
43  node->tr = aura_transport_lookup(name);
44  if (!node->tr) {
45  slog(0, SLOG_FATAL, "Invalid transport name: %s", name);
46  goto err_free_node;
47  }
48 
49  INIT_LIST_HEAD(&node->outbound_buffers);
50  INIT_LIST_HEAD(&node->inbound_buffers);
51  INIT_LIST_HEAD(&node->event_buffers);
52  INIT_LIST_HEAD(&node->buffer_pool);
53  node->gc_threshold = 10; /* This should be more than enough */
54 
55  node->status = AURA_STATUS_OFFLINE;
56 
57  /* Eventsystem will be either lazy-initialized or created via
58  * aura_eventloop_* functions
59  */
60 
61  if (node->tr->open)
62  ret = node->tr->open(node, opts);
63 
64  if (ret != 0)
65  goto err_free_node;
66 
67  slog(6, SLOG_LIVE, "Created a node using transport: %s", name);
68  node->is_opening = false;
69  return node;
70 
71 err_free_node:
72  slog(0, SLOG_FATAL, "Error opening transport: %s", name);
73  free(node);
74  return NULL;
75 }
76 
77 
78 static void cleanup_buffer_queue(struct list_head *q, bool destroy)
79 {
80  int i = 0;
81 
82  struct list_head *pos, *tmp;
83  list_for_each_safe(pos, tmp, q) {
84  struct aura_buffer *b;
85  b = list_entry(pos, struct aura_buffer, qentry);
86  list_del(pos);
87  if (!destroy) /* Just return it to the pool */
89  else /* Nuke it, we're closing down */
91  i++;
92  }
93  slog(6, SLOG_LIVE, "Cleaned up %d buffers", i);
94 }
95 
96 
102 void aura_close(struct aura_node *node)
103 {
104  struct aura_eventloop *loop = aura_eventloop_get_data(node);
105 
106  if (node->tr->close)
107  node->tr->close(node);
108 
109  if (loop) {
110  if (loop->autocreated)
112  else
113  aura_eventloop_del(node);
114  }
115 
116  /* After transport shutdown we need to clean up
117  remaining buffers */
118  cleanup_buffer_queue(&node->inbound_buffers, true);
119  cleanup_buffer_queue(&node->outbound_buffers, true);
120  cleanup_buffer_queue(&node->buffer_pool, true);
121  aura_transport_release(node->tr);
122  /* Check if we have an export table registered and nuke it */
123  if (node->tbl)
124  aura_etable_destroy(node->tbl);
125 
126  /* Free file descriptors */
127  if (node->fds)
128  free(node->fds);
129 
130  free(node);
131  slog(6, SLOG_LIVE, "Transport closed");
132 }
133 
139 /* This one is small, but tricky */
140 static void aura_handle_inbound(struct aura_node *node)
141 {
142  while(1) {
143  struct aura_buffer *buf;
144  struct aura_object *o;
145 
146  buf = aura_dequeue_buffer(&node->inbound_buffers);
147  if (!buf)
148  break;
149 
150  o = buf->object;
151  node->current_object = o;
152  aura_buffer_rewind(buf);
153 
154  slog(4, SLOG_DEBUG, "Handling %s id %d (%s) sync_call_running=%d",
155  object_is_method(o) ? "response" : "event",
156  o->id, o->name, node->sync_call_running);
157 
158  if (object_is_method(o) && !o->pending) {
159  slog(0, SLOG_WARN, "Dropping orphan call result %d (%s)",
160  o->id, o->name);
161  aura_buffer_release(buf);
162  } else if (o->calldonecb) {
163  slog(4, SLOG_DEBUG, "Callback for method/event %d (%s)",
164  o->id, o->name);
165  o->calldonecb(node, AURA_CALL_COMPLETED, buf, o->arg);
166  aura_buffer_release(buf);
167  } else if (object_is_method(o) && (node->sync_call_running)) {
168  slog(4, SLOG_DEBUG, "Completing call for method %d (%s)",
169  o->id, o->name);
170  node->sync_call_result = AURA_CALL_COMPLETED;
171  node->sync_ret_buf = buf;
172  o->pending--;
173  if (o->pending < 0)
174  BUG(node, "Internal BUG: pending evt count lesser than zero");
175  } else {
176  /* This one is tricky. We have an event with no callback */
177  if (node->sync_event_max > 0) { /* Queue it up into event_queue if it's enabled */
178  /* If we have an overrun - drop the oldest event to free up space first*/
179  if (node->sync_event_max <= node->sync_event_count) {
180  struct aura_buffer *todrop;
181  const struct aura_object *dummy;
182  int ret = aura_get_next_event(node, &dummy, &todrop);
183  if (ret != 0)
184  BUG(node, "Internal bug, no next event");
185  aura_buffer_release(todrop);
186  }
187 
188  /* Now just queue the next one */
189  aura_queue_buffer(&node->event_buffers, buf);
190  node->sync_event_count++;
191  slog(4, SLOG_DEBUG, "Queued event %d (%s) for sync readout",
192  o->id, o->name);
193  } else {
194  /* Last resort - try the catch-all event callback */
195  if (node->unhandled_evt_cb)
196  node->unhandled_evt_cb(node, buf, node->unhandled_evt_arg);
197  else /* Or just drop it with a warning */
198  slog(0, SLOG_WARN, "Dropping event %d (%s)",
199  o->id, o->name);
200  aura_buffer_release(buf);
201  }
202  }
203  }
204 
205  node->current_object = NULL;
206 }
207 
226 const struct aura_object *aura_get_current_object(struct aura_node *node)
227 {
228  /* Make some noise */
229  if (!node->current_object) {
230  slog(0, SLOG_WARN, "Looks like you're calling aura_get_current_object() outside the callback");
231  slog(0, SLOG_WARN, "Don't do that - read the docs!");
232  }
233  return node->current_object;
234 }
235 
243 struct aura_eventloop *aura_eventloop_get_data(struct aura_node *node)
244 {
245  return node->eventsys_data;
246 }
247 
248 
249 
258 void aura_status_changed_cb(struct aura_node *node,
259  void (*cb)(struct aura_node *node, int newstatus, void *arg),
260  void *arg)
261 {
262  node->status_changed_arg = arg;
263  node->status_changed_cb = cb;
264 }
265 
275 void aura_fd_changed_cb(struct aura_node *node,
276  void (*cb)(const struct aura_pollfds *fd, enum aura_fd_action act, void *arg),
277  void *arg)
278 {
279  int i;
280  const struct aura_pollfds *fds;
281  int count = aura_get_pollfds(node, &fds);
282  node->fd_changed_arg = arg;
283  node->fd_changed_cb = cb;
284  if (node->fd_changed_cb)
285  for (i=0; i<count; i++)
286  node->fd_changed_cb(&fds[i], AURA_FD_ADDED, node->fd_changed_arg);
287 }
288 
295 void aura_etable_changed_cb(struct aura_node *node,
296  void (*cb)(struct aura_node *node,
297  struct aura_export_table *old,
298  struct aura_export_table *new,
299  void *arg),
300  void *arg)
301 {
302  node->etable_changed_arg = arg;
303  node->etable_changed_cb = cb;
304 }
305 
314 void aura_unhandled_evt_cb(struct aura_node *node,
315  void (*cb)(struct aura_node *node,
316  struct aura_buffer *buf,
317  void *arg),
318  void *arg)
319 {
320  node->unhandled_evt_cb = cb;
321  node->unhandled_evt_arg = arg;
322 }
323 
339 void aura_object_migration_failed_cb(struct aura_node *node,
340  void (*cb)(struct aura_node *node,
341  struct aura_object *failed,
342  void *arg),
343  void *arg)
344 {
345  node->object_migration_failed_cb = cb;
346  node->object_migration_failed_arg = arg;
347 }
348 
361 int aura_core_start_call(struct aura_node *node,
362  struct aura_object *o,
363  void (*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg),
364  void *arg,
365  struct aura_buffer *buf)
366 {
367  struct aura_eventloop *loop = aura_eventsys_get_autocreate(node);
368  int isfirst;
369 
370  if (!o)
371  return -EBADSLT;
372 
373  if(node->status != AURA_STATUS_ONLINE)
374  return -ENOEXEC;
375 
376  if (o->pending)
377  return -EIO;
378 
379  if (!loop)
380  BUG(node, "Node has no assosiated event system. Fix your code!");
381 
382  isfirst = list_empty(&node->outbound_buffers);
383 
384  o->calldonecb = calldonecb;
385  o->arg = arg;
386  buf->object = o;
387  o->pending++;
388 
389  aura_queue_buffer(&node->outbound_buffers, buf);
390  slog(4, SLOG_DEBUG, "Queued call for id %d (%s), notifying node", o->id, o->name);
391 
392  if (isfirst) {
393  slog(4, SLOG_DEBUG, "Notifying transport of queue status change");
394  node->last_checked = 0;
395  aura_eventloop_interrupt(loop);
396  };
397 
398  return 0;
399 }
400 
413  struct aura_node *node,
414  struct aura_object *o,
415  struct aura_buffer **retbuf,
416  struct aura_buffer *argbuf)
417 {
418  int ret;
419  struct aura_eventloop *loop = aura_eventsys_get_autocreate(node);
420 
421  if (node->sync_call_running)
422  BUG(node, "Internal bug: Synchronos call within a synchronos call");
423 
424  node->sync_call_running = true;
425 
426  if ((ret=aura_core_start_call(node, o, NULL, NULL, argbuf))) {
427  node->sync_call_result = ret;
428  goto bailout;
429  }
430 
431  while (o->pending) {
432  aura_handle_events(loop);
433  }
434 
435  slog(4, SLOG_DEBUG, "Call completed");
436  *retbuf = node->sync_ret_buf;
437 
438 bailout:
439  node->sync_call_running = false;
440  return node->sync_call_result;
441 }
442 
463  struct aura_node *node,
464  int id,
465  void (*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg),
466  void *arg,
467  ...)
468 {
469  va_list ap;
470  struct aura_buffer *buf;
471  int ret;
472 
473  struct aura_object *o = aura_etable_find_id(node->tbl, id);
474  if (!o)
475  return -EBADSLT;
476 
477  va_start(ap, arg);
478  buf = aura_serialize(node, o->arg_fmt, o->arglen, ap);
479  va_end(ap);
480 
481  if (!buf)
482  return -EIO;
483 
484  ret = aura_core_start_call(node, o, calldonecb, arg, buf);
485 
486  if (ret != 0)
487  aura_buffer_release(buf);
488 
489  return ret;
490 }
491 
506  struct aura_node *node,
507  int id,
508  void (*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg),
509  void *arg)
510 {
511  struct aura_object *o = aura_etable_find_id(node->tbl, id);
512  if (!o)
513  return -EBADSLT;
514 
515  if (!object_is_event(o))
516  return -EBADF;
517 
518  o->calldonecb = calldonecb;
519  o->arg = arg;
520  return 0;
521 }
522 
537  struct aura_node *node,
538  const char *event,
539  void (*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg),
540  void *arg)
541 {
542  struct aura_object *o = aura_etable_find(node->tbl, event);
543  if (!o)
544  return -EBADSLT;
545 
546  if (!object_is_event(o))
547  return -EBADF;
548 
549  o->calldonecb = calldonecb;
550  o->arg = arg;
551  return 0;
552 }
553 
565  struct aura_node *node,
566  const char *name,
567  void (*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg),
568  void *arg,
569  ...)
570 {
571  struct aura_object *o;
572  va_list ap;
573  struct aura_buffer *buf;
574  int ret;
575 
576  o = aura_etable_find(node->tbl, name);
577  if (!o)
578  return -ENOENT;
579 
580  va_start(ap, arg);
581  buf = aura_serialize(node, o->arg_fmt, o->arglen, ap);
582  va_end(ap);
583  if (!buf)
584  return -EIO;
585 
586  ret = aura_core_start_call(node, o, calldonecb, arg, buf);
587 
588  if (ret != 0)
589  aura_buffer_release(buf);
590 
591  return ret;
592 }
593 
607 void aura_wait_status(struct aura_node *node, int status)
608 {
609  struct aura_eventloop *loop = aura_eventsys_get_autocreate(node);
610  while (node->status != status)
611  aura_handle_events(loop);
612 }
613 
614 
627  struct aura_node *node,
628  int id,
629  struct aura_buffer **retbuf,
630  ...)
631 {
632  va_list ap;
633  struct aura_buffer *buf;
634 
635  struct aura_object *o = aura_etable_find_id(node->tbl, id);
636 
637  if (node->sync_call_running)
638  BUG(node, "Internal bug: Synchronos call within a synchronos call");
639 
640  if (!o)
641  return -EBADSLT;
642 
643  va_start(ap, retbuf);
644  buf = aura_serialize(node, o->arg_fmt, o->arglen, ap);
645  va_end(ap);
646 
647  if (!buf) {
648  slog(2, SLOG_WARN, "Serialization failed");
649  return -EIO;
650  }
651 
652  return aura_core_call(node, o, retbuf, buf);
653 }
654 
667  struct aura_node *node,
668  const char *name,
669  struct aura_buffer **retbuf,
670  ...)
671 {
672  va_list ap;
673  struct aura_buffer *buf;
674  struct aura_object *o = aura_etable_find(node->tbl, name);
675 
676  if (!o)
677  return -EBADSLT;
678 
679  va_start(ap, retbuf);
680  buf = aura_serialize(node, o->arg_fmt, o->arglen, ap);
681  va_end(ap);
682 
683  if (!buf) {
684  slog(2, SLOG_WARN, "Serialization failed");
685  return -EIO;
686  }
687 
688  return aura_core_call(node, o, retbuf, buf);
689 }
690 
691 
711 void aura_enable_sync_events(struct aura_node *node, int count)
712 {
713  while(node->sync_event_max >= count) {
714  const struct aura_object *o;
715  struct aura_buffer *buf;
716  int ret = aura_get_next_event(node, &o, &buf);
717  if (ret!=0)
718  BUG(node, "Internal bug while resizing event queue (failed to drop some events)");
719  aura_buffer_release(buf);
720  }
721  node->sync_event_max = count;
722 }
723 
730 int aura_get_pending_events(struct aura_node *node)
731 {
732  return node->sync_event_count;
733 }
734 
753 int aura_get_next_event(struct aura_node *node, const struct aura_object ** obj, struct aura_buffer **retbuf)
754 {
755  struct aura_eventloop *loop = aura_eventsys_get_autocreate(node);
756 
757  while (!node->sync_event_count) {
758  aura_handle_events(loop);
759  }
760 
761  *retbuf = aura_dequeue_buffer(&node->event_buffers);
762  if (!(*retbuf))
763  aura_panic(node);
764 
765  *obj = (const struct aura_object *)(*retbuf)->object;
766  node->sync_event_count--;
767  return 0;
768 }
769 
789 void aura_call_fail(struct aura_node *node, struct aura_object *o)
790 {
791  if (o->pending && o->calldonecb)
792  o->calldonecb(node, AURA_CALL_TRANSPORT_FAIL, NULL, o->arg);
793  if (o->pending)
794  o->pending--;
795 
796  node->sync_call_result = AURA_CALL_TRANSPORT_FAIL;
797  node->sync_ret_buf = NULL;
798 }
799 
808 void aura_set_status(struct aura_node *node, int status)
809 {
810  int oldstatus = node->status;
811  node->status = status;
812 
813  if (oldstatus == status)
814  return;
815 
816  if (node->is_opening)
817  BUG(node, "Transport BUG: Do not call aura_set_status in open()");
818 
819  if ((oldstatus == AURA_STATUS_OFFLINE) && (status == AURA_STATUS_ONLINE)) {
820  /* Dump etable */
821  int i;
822  slog(2, SLOG_INFO, "Node %s is now going online", node->tr->name);
823  slog(2, SLOG_INFO, "--- Dumping export table ---");
824  for (i=0; i< node->tbl->next; i++) {
825  slog(2, SLOG_INFO, "%d. %s %s %s(%s ) [out %d bytes] | [in %d bytes] ",
826  node->tbl->objects[i].id,
827  object_is_method((&node->tbl->objects[i])) ? "METHOD" : "EVENT ",
828  node->tbl->objects[i].ret_pprinted,
829  node->tbl->objects[i].name,
830  node->tbl->objects[i].arg_pprinted,
831  node->tbl->objects[i].arglen,
832  node->tbl->objects[i].retlen);
833  }
834  slog(1, SLOG_INFO, "-------------8<-------------");
835  }
836  if ((oldstatus == AURA_STATUS_ONLINE) && (status == AURA_STATUS_OFFLINE)) {
837  int i;
838 
839  slog(2, SLOG_INFO, "Node %s going offline, clearing outbound queue",
840  node->tr->name);
841  cleanup_buffer_queue(&node->outbound_buffers, false);
842  /* Handle any remaining inbound messages */
843  aura_handle_inbound(node);
844  /* Cancel any pending calls */
845  for (i=0; i < node->tbl->next; i++) {
846  struct aura_object *o;
847  o=&node->tbl->objects[i];
848  if (o->pending && o->calldonecb)
849  o->calldonecb(node, AURA_CALL_TRANSPORT_FAIL, NULL, o->arg);
850  if (o->pending)
851  o->pending--;
852  }
853  /* If any of the synchronos calls are running - inform them */
854  node->sync_call_result = AURA_CALL_TRANSPORT_FAIL;
855  node->sync_ret_buf = NULL;
856  }
857 
858  if (node->status_changed_cb)
859  node->status_changed_cb(node, status, node->status_changed_arg);
860 }
861 
870 void aura_set_node_endian(struct aura_node *node, enum aura_endianness en)
871 {
872  if (aura_get_host_endianness() != en)
873  node->need_endian_swap = true;
874 }
886 void aura_process_node_event(struct aura_node *node, const struct aura_pollfds *fd)
887 {
888  if (fd && node->tr->loop)
889  node->tr->loop(node, fd);
890 
891  uint64_t curtime = aura_platform_timestamp();
892 
893  if ((curtime - node->last_checked > node->poll_timeout) && node->tr->loop) {
894  node->tr->loop(node, NULL);
895  node->last_checked = curtime;
896  }
897 
898  /* Now grab all we got from the inbound queue and fire the callbacks */
899  aura_handle_inbound(node);
900 }
901 
902 void aura_eventloop_set_data(struct aura_node *node, struct aura_eventloop *data)
903 {
904  node->eventsys_data = data;
905 }
906 
907 
int aura_get_next_event(struct aura_node *node, const struct aura_object **obj, struct aura_buffer **retbuf)
Definition: aura.c:753
void aura_set_status(struct aura_node *node, int status)
Definition: aura.c:808
struct aura_buffer * aura_dequeue_buffer(struct list_head *head)
Definition: queue.c:45
int aura_core_call(struct aura_node *node, struct aura_object *o, struct aura_buffer **retbuf, struct aura_buffer *argbuf)
Definition: aura.c:412
#define aura_eventloop_create(...)
Definition: aura.h:759
Definition: list.h:61
void aura_wait_status(struct aura_node *node, int status)
Definition: aura.c:607
struct list_head qentry
Definition: aura.h:345
struct aura_eventloop * aura_eventloop_get_data(struct aura_node *node)
Definition: aura.c:243
static void aura_buffer_rewind(struct aura_buffer *buf)
Definition: inlines.h:93
void aura_object_migration_failed_cb(struct aura_node *node, void(*cb)(struct aura_node *node, struct aura_object *failed, void *arg), void *arg)
Definition: aura.c:339
int aura_set_event_callback_raw(struct aura_node *node, int id, void(*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg), void *arg)
Definition: aura.c:505
void aura_handle_events(struct aura_eventloop *loop)
Definition: eventloop.c:201
int aura_call_raw(struct aura_node *node, int id, struct aura_buffer **retbuf,...)
Definition: aura.c:626
void aura_eventloop_destroy(struct aura_eventloop *loop)
Definition: eventloop.c:166
int aura_get_pending_events(struct aura_node *node)
Definition: aura.c:730
int aura_call(struct aura_node *node, const char *name, struct aura_buffer **retbuf,...)
Definition: aura.c:666
struct aura_node * aura_open(const char *name, const char *opts)
Definition: aura.c:34
void aura_fd_changed_cb(struct aura_node *node, void(*cb)(const struct aura_pollfds *fd, enum aura_fd_action act, void *arg), void *arg)
Definition: aura.c:275
void aura_call_fail(struct aura_node *node, struct aura_object *o)
Definition: aura.c:789
int aura_start_call_raw(struct aura_node *node, int id, void(*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg), void *arg,...)
Definition: aura.c:462
void aura_eventloop_del(struct aura_node *node)
Definition: eventloop.c:69
void aura_queue_buffer(struct list_head *list, struct aura_buffer *buf)
Definition: queue.c:16
int aura_core_start_call(struct aura_node *node, struct aura_object *o, void(*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg), void *arg, struct aura_buffer *buf)
Definition: aura.c:361
void aura_enable_sync_events(struct aura_node *node, int count)
Definition: aura.c:711
const struct aura_object * aura_get_current_object(struct aura_node *node)
Obtain the pointer to the current aura_object.
Definition: aura.c:226
void aura_status_changed_cb(struct aura_node *node, void(*cb)(struct aura_node *node, int newstatus, void *arg), void *arg)
Definition: aura.c:258
void aura_etable_changed_cb(struct aura_node *node, void(*cb)(struct aura_node *node, struct aura_export_table *old, struct aura_export_table *new, void *arg), void *arg)
Definition: aura.c:295
void aura_unhandled_evt_cb(struct aura_node *node, void(*cb)(struct aura_node *node, struct aura_buffer *buf, void *arg), void *arg)
Definition: aura.c:314
void aura_buffer_release(struct aura_buffer *buf)
Definition: buffer.c:80
void aura_close(struct aura_node *node)
Definition: aura.c:102
void aura_set_node_endian(struct aura_node *node, enum aura_endianness en)
Definition: aura.c:870
void aura_buffer_destroy(struct aura_buffer *buf)
Definition: buffer.c:101
int aura_start_call(struct aura_node *node, const char *name, void(*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg), void *arg,...)
Definition: aura.c:564
struct aura_object * object
Definition: aura.h:341
int aura_set_event_callback(struct aura_node *node, const char *event, void(*calldonecb)(struct aura_node *dev, int status, struct aura_buffer *ret, void *arg), void *arg)
Definition: aura.c:536