|
|
- /* See LICENSE file for copyright and license details. */
- #include <errno.h>
- #include <pthread.h>
- #include <stddef.h>
- #include <stdlib.h>
- #include <string.h>
-
- #include "connection.h"
- #include "queue.h"
- #include "server.h"
- #include "util.h"
-
- struct worker_data {
- int insock;
- size_t nslots;
- const struct server *srv;
- };
-
- static void *
- server_worker(void *data)
- {
- queue_event *event = NULL;
- struct connection *connection, *c, *newc;
- struct worker_data *d = (struct worker_data *)data;
- int qfd;
- ssize_t nready;
- size_t i;
-
- /* allocate connections */
- if (!(connection = calloc(d->nslots, sizeof(*connection)))) {
- die("calloc:");
- }
-
- /* create event queue */
- if ((qfd = queue_create()) < 0) {
- exit(1);
- }
-
- /* add insock to the interest list (with data=NULL) */
- if (queue_add_fd(qfd, d->insock, QUEUE_EVENT_IN, 1, NULL) < 0) {
- exit(1);
- }
-
- /* allocate event array */
- if (!(event = reallocarray(event, d->nslots, sizeof(*event)))) {
- die("reallocarray:");
- }
-
- for (;;) {
- /* wait for new activity */
- if ((nready = queue_wait(qfd, event, d->nslots)) < 0) {
- exit(1);
- }
-
- /* handle events */
- for (i = 0; i < (size_t)nready; i++) {
- c = queue_event_get_data(&event[i]);
-
- if (queue_event_is_error(&event[i])) {
- if (c != NULL) {
- queue_rem_fd(qfd, c->fd);
- c->res.status = 0;
- connection_log(c);
- connection_reset(c);
- }
-
- continue;
- }
-
- if (c == NULL) {
- /* add new connection to the interest list */
- if (!(newc = connection_accept(d->insock,
- connection,
- d->nslots))) {
- /*
- * the socket is either blocking
- * or something failed.
- * In both cases, we just carry on
- */
- continue;
- }
-
- /*
- * add event to the interest list
- * (we want IN, because we start
- * with receiving the header)
- */
- if (queue_add_fd(qfd, newc->fd,
- QUEUE_EVENT_IN,
- 0, newc) < 0) {
- /* not much we can do here */
- continue;
- }
- } else {
- /* serve existing connection */
- connection_serve(c, d->srv);
-
- if (c->fd == 0) {
- /* we are done */
- memset(c, 0, sizeof(struct connection));
- continue;
- }
-
- /*
- * rearm the event based on the state
- * we are "stuck" at
- */
- switch(c->state) {
- case C_RECV_HEADER:
- if (queue_mod_fd(qfd, c->fd,
- QUEUE_EVENT_IN,
- c) < 0) {
- connection_reset(c);
- break;
- }
- break;
- case C_SEND_HEADER:
- case C_SEND_BODY:
- if (queue_mod_fd(qfd, c->fd,
- QUEUE_EVENT_OUT,
- c) < 0) {
- connection_reset(c);
- break;
- }
- break;
- default:
- break;
- }
- }
- }
- }
-
- return NULL;
- }
-
- void
- server_init_thread_pool(int insock, size_t nthreads, size_t nslots,
- const struct server *srv)
- {
- pthread_t *thread = NULL;
- struct worker_data *d = NULL;
- size_t i;
-
- /* allocate worker_data structs */
- if (!(d = reallocarray(d, nthreads, sizeof(*d)))) {
- die("reallocarray:");
- }
- for (i = 0; i < nthreads; i++) {
- d[i].insock = insock;
- d[i].nslots = nslots;
- d[i].srv = srv;
- }
-
- /* allocate and initialize thread pool */
- if (!(thread = reallocarray(thread, nthreads, sizeof(*thread)))) {
- die("reallocarray:");
- }
- for (i = 0; i < nthreads; i++) {
- if (pthread_create(&thread[i], NULL, server_worker, &d[i]) != 0) {
- if (errno == EAGAIN) {
- die("You need to run as root or have "
- "CAP_SYS_RESOURCE set, or are trying "
- "to create more threads than the "
- "system can offer");
- } else {
- die("pthread_create:");
- }
- }
- }
-
- /* wait for threads */
- for (i = 0; i < nthreads; i++) {
- if ((errno = pthread_join(thread[i], NULL))) {
- warn("pthread_join:");
- }
- }
- }
|