librfn
An ad-hoc utility library
fibre.c
Go to the documentation of this file.
1 /*
2  * fibre.c
3  *
4  * Part of librfn (a general utility library from redfelineninja.org.uk)
5  *
6  * Copyright (C) 2013 Daniel Thompson <daniel@redfelineninja.org.uk>
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU Lesser General Public License as published
10  * by the Free Software Foundation; either version 3 of the License, or
11  * (at your option) any later version.
12  */
13 
14 #include "librfn/fibre.h"
15 
16 #include <assert.h>
17 #include <stdint.h>
18 #include <stdlib.h>
19 #include <string.h>
20 
21 #include "librfn/atomic.h"
22 #include "librfn/list.h"
23 #include "librfn/messageq.h"
24 #include "librfn/util.h"
25 
26 static fibre_t *atomic_runq_buf[8];
27 
28 static struct {
31  uint32_t now;
32 
36 
37  atomic_uint taint_flags;
38 } kernel = {
39  .runq = LIST_VAR_INIT,
40  .atomic_runq = MESSAGEQ_VAR_INIT(
41  atomic_runq_buf,
42  sizeof(atomic_runq_buf), sizeof(atomic_runq_buf[0])),
43  .timerq = LIST_VAR_INIT
44 };
45 
46 static void handle_atomic_runq(void)
47 {
48  fibre_t **f;
49 
50  while (NULL != (f = messageq_receive(&kernel.atomic_runq))) {
51  fibre_run(*f);
52  messageq_release(&kernel.atomic_runq, f);
53  }
54 }
55 
56 static void handle_timerq(void)
57 {
58  list_iterator_t iter;
59 
60  list_node_t *node = list_iterate(&kernel.timerq, &iter);
61  fibre_t *timeout_fibre = containerof(node, fibre_t, link);
62  while (NULL != node &&
63  cyclecmp32(timeout_fibre->duetime, kernel.now) <= 0) {
64  node = list_iterator_remove(&iter);
65  list_insert(&kernel.runq, &timeout_fibre->link);
66  timeout_fibre = containerof(node, fibre_t, link);
67  }
68 
69 }
70 
71 static fibre_t *get_next_task(void)
72 {
73  list_node_t *node = list_extract(&kernel.runq);
74  if (!node)
75  return NULL;
76 
77  return containerof(node, fibre_t, link);
78 }
79 
80 static void update_current_state(void)
81 {
82  kernel.current->state = kernel.state;
83 
84  switch (kernel.state) {
86  fibre_run(kernel.current);
87  break;
88  case FIBRE_STATE_EXITED:
89  PT_INIT(&kernel.current->priv);
90  break;
92  break;
93  default:
94  assert(0);
95  break;
96  }
97 
98  // preserve kernel.current since we permit fibre_self() to be called
99  // from the main loop in order to implement task accounting.
100 }
101 
102 static uint32_t get_next_wakeup(void)
103 {
104  if (!messageq_empty(&kernel.atomic_runq) || !list_empty(&kernel.runq))
105  return kernel.now;
106 
107  if (list_empty(&kernel.timerq))
108  return kernel.now + FIBRE_UNBOUNDED_SLEEP;
109 
110  fibre_t *fibre = containerof(list_peek(&kernel.timerq), fibre_t, link);
111  return fibre->duetime;
112 }
113 
114 static void add_taint(char id)
115 {
116  id -= 'A';
117  assert(id < 8*sizeof(kernel.taint_flags));
118  atomic_fetch_or(&kernel.taint_flags, 1 << id);
119 }
120 
121 static int duetime_cmp(list_node_t *n1, list_node_t *n2)
122 {
123  fibre_t *f1 = containerof(n1, fibre_t, link);
124  fibre_t *f2 = containerof(n2, fibre_t, link);
125 
126  return f1->duetime - f2->duetime;
127 }
128 
130 {
131  return kernel.current;
132 }
133 
134 uint32_t fibre_scheduler_next(uint32_t time)
135 {
136  kernel.now = time;
137 
138  /*
139  * When we have a single fibre yielding to itself we can create a
140  * fast path by skipping scheduler updates. This allows processor
141  * intensive work to be harmed as little as possible even when the
142  * fibre they run in seeks to cooperate with other fibres.
143  */
144  if (kernel.state != FIBRE_STATE_YIELDED ||
145  !list_empty(&kernel.runq) ||
146  !list_empty(&kernel.timerq) ||
147  !messageq_empty(&kernel.atomic_runq)) {
148  handle_atomic_runq();
149  if (kernel.current)
150  update_current_state();
151  handle_timerq();
152  kernel.current = get_next_task();
153  }
154 
155  if (kernel.current) {
156  kernel.state = kernel.current->fn(kernel.current);
157  if (kernel.state == FIBRE_STATE_YIELDED)
158  return kernel.now;
159  }
160 
161  return get_next_wakeup();
162 }
163 
165 {
166  memset(f, 0, sizeof(*f));
167 
168  f->fn = fn;
169  // TODO: list_node_init
170  //list_node_init(&f->link);
171 }
172 
174 {
175  handle_atomic_runq();
176 
177  if (!list_contains(&kernel.runq, &f->link, NULL)) {
178  (void) list_remove(&kernel.timerq, &f->link);
179  list_insert(&kernel.runq, &f->link);
180  }
181 }
182 
184 {
185  fibre_t **queued_fibre = messageq_claim(&kernel.atomic_runq);
186  if (!queued_fibre) {
187  add_taint('A');
188  return false;
189  }
190 
191  *queued_fibre = f;
192  messageq_send(&kernel.atomic_runq, queued_fibre);
193  return true;
194 }
195 
197 {
198  bool res = false;
199 
200  handle_atomic_runq();
201 
202  res |= list_remove(&kernel.runq, &f->link);
203  res |= list_remove(&kernel.timerq, &f->link);
204 
205  return res;
206 }
207 
208 bool fibre_timeout(uint32_t duetime)
209 {
210  if (cyclecmp32(duetime, kernel.now) <= 0)
211  return true;
212 
213  kernel.current->duetime = duetime;
214  if (!list_contains(&kernel.runq, &kernel.current->link, NULL))
215  list_insert_sorted(&kernel.timerq, &kernel.current->link, duetime_cmp);
216  return false;
217 }
218 
220  void *basep, size_t base_len, size_t msg_len)
221 {
222  fibre_init(&evtq->fibre, fn);
223  messageq_init(&evtq->eventq, basep, base_len, msg_len);
224 }
225 
227 {
228  void *evtp = messageq_claim(&evtq->eventq);
229  if (!evtp)
230  add_taint('E');
231  return evtp;
232 }
233 
234 bool fibre_eventq_send(fibre_eventq_t *evtq, void *evtp)
235 {
236  messageq_send(&evtq->eventq, evtp);
237  return fibre_run_atomic(&evtq->fibre);
238 }
239 
241 {
242  return messageq_empty(&evtq->eventq);
243 }
244 
246 {
247  return messageq_receive(&evtq->eventq);
248 }
249 
250 void fibre_eventq_release(fibre_eventq_t *evtq, void *evtp)
251 {
252  messageq_release(&evtq->eventq, evtp);
253 }
#define containerof(ptr, type, member)
Definition: util.h:35
void messageq_init(messageq_t *mq, void *basep, size_t base_len, size_t msg_len)
Definition: messageq.c:21
uint32_t duetime
Definition: fibre.h:68
list_node_t * list_iterate(list_t *list, list_iterator_t *iter)
Definition: list.c:91
fibre_state_t
Definition: fibre.h:34
Definition: list.h:39
void fibre_init(fibre_t *f, fibre_entrypoint_t *fn)
Dynamic initializer for a fibre descriptor.
Definition: fibre.c:164
fibre_t fibre
Definition: fibre.h:87
void * fibre_eventq_claim(fibre_eventq_t *evtq)
Request memory resources to send an event to a fibre.
Definition: fibre.c:226
bool list_contains(list_t *list, list_node_t *node, list_iterator_t *iter)
Definition: list.c:155
void messageq_release(messageq_t *mq, void *msg)
Definition: messageq.c:73
fibre_entrypoint_t * fn
Definition: fibre.h:65
uint32_t fibre_scheduler_next(uint32_t time)
Schedule the next fibre.
Definition: fibre.c:134
void * messageq_claim(messageq_t *mq)
Definition: messageq.c:31
bool fibre_eventq_empty(fibre_eventq_t *evtq)
Return true if the fibre's event queue is empty.
Definition: fibre.c:240
list_t timerq
Definition: fibre.c:35
void fibre_eventq_init(fibre_eventq_t *evtq, fibre_entrypoint_t *fn, void *basep, size_t base_len, size_t msg_len)
Dynamic initializer for a fibre and eventq descriptor.
Definition: fibre.c:219
bool fibre_kill(fibre_t *f)
Definition: fibre.c:196
fibre_state_t state
Definition: fibre.c:30
bool fibre_eventq_send(fibre_eventq_t *evtq, void *evtp)
Send an event to a fibre.
Definition: fibre.c:234
int fibre_entrypoint_t(struct fibre *)
Definition: fibre.h:59
list_node_t link
Definition: fibre.h:69
void fibre_eventq_release(fibre_eventq_t *evtq, void *evtp)
Release a message previously received by a fibre.
Definition: fibre.c:250
void * fibre_eventq_receive(fibre_eventq_t *evtq)
Recevied a message previously send to the fibre.
Definition: fibre.c:245
void * messageq_receive(messageq_t *mq)
Definition: messageq.c:57
list_node_t * list_iterator_remove(list_iterator_t *iter)
Definition: list.c:124
int32_t cyclecmp32(uint32_t a, uint32_t b)
Compares values that may be subject to overflow.
Definition: util.c:19
fibre_t * current
Definition: fibre.c:29
void list_insert(list_t *list, list_node_t *node)
Definition: list.c:21
bool fibre_run_atomic(fibre_t *f)
Definition: fibre.c:183
Definition: list.h:34
uint32_t now
Definition: fibre.c:31
messageq_t eventq
Definition: fibre.h:88
list_t runq
Definition: fibre.c:33
#define LIST_VAR_INIT
Definition: list.h:43
atomic_uint taint_flags
Definition: fibre.c:37
#define FIBRE_UNBOUNDED_SLEEP
An approximation of infinitely far in the future.
Definition: fibre.h:56
#define MESSAGEQ_VAR_INIT(basep, base_len, msg_len)
Definition: messageq.h:54
Fibre and eventq descriptor.
Definition: fibre.h:86
fibre_t * fibre_self()
Returns the currently active fibre descriptor.
Definition: fibre.c:129
void list_insert_sorted(list_t *list, list_node_t *node, list_node_compare_t *nodecmp)
Definition: list.c:34
list_node_t * list_extract(list_t *list)
Definition: list.c:77
void messageq_send(messageq_t *mq, void *msg)
Definition: messageq.c:50
messageq_t atomic_runq
Definition: fibre.c:34
Fibre descriptor.
Definition: fibre.h:64
void fibre_run(fibre_t *f)
Definition: fibre.c:173
bool fibre_timeout(uint32_t duetime)
Definition: fibre.c:208
bool list_remove(list_t *list, list_node_t *node)
Definition: list.c:171
#define PT_INIT(pt)
Definition: protothreads.h:88