Another copy of my dotfiles. Because I don't completely trust GitHub.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

177 lines
3.7 KiB

/* See LICENSE file for copyright and license details. */
#include <errno.h>
#include <pthread.h>
#include <stddef.h>
#include <stdlib.h>
#include <string.h>
#include "connection.h"
#include "queue.h"
#include "server.h"
#include "util.h"
struct worker_data {
int insock;
size_t nslots;
const struct server *srv;
};
static void *
server_worker(void *data)
{
queue_event *event = NULL;
struct connection *connection, *c, *newc;
struct worker_data *d = (struct worker_data *)data;
int qfd;
ssize_t nready;
size_t i;
/* allocate connections */
if (!(connection = calloc(d->nslots, sizeof(*connection)))) {
die("calloc:");
}
/* create event queue */
if ((qfd = queue_create()) < 0) {
exit(1);
}
/* add insock to the interest list (with data=NULL) */
if (queue_add_fd(qfd, d->insock, QUEUE_EVENT_IN, 1, NULL) < 0) {
exit(1);
}
/* allocate event array */
if (!(event = reallocarray(event, d->nslots, sizeof(*event)))) {
die("reallocarray:");
}
for (;;) {
/* wait for new activity */
if ((nready = queue_wait(qfd, event, d->nslots)) < 0) {
exit(1);
}
/* handle events */
for (i = 0; i < (size_t)nready; i++) {
c = queue_event_get_data(&event[i]);
if (queue_event_is_error(&event[i])) {
if (c != NULL) {
queue_rem_fd(qfd, c->fd);
c->res.status = 0;
connection_log(c);
connection_reset(c);
}
continue;
}
if (c == NULL) {
/* add new connection to the interest list */
if (!(newc = connection_accept(d->insock,
connection,
d->nslots))) {
/*
* the socket is either blocking
* or something failed.
* In both cases, we just carry on
*/
continue;
}
/*
* add event to the interest list
* (we want IN, because we start
* with receiving the header)
*/
if (queue_add_fd(qfd, newc->fd,
QUEUE_EVENT_IN,
0, newc) < 0) {
/* not much we can do here */
continue;
}
} else {
/* serve existing connection */
connection_serve(c, d->srv);
if (c->fd == 0) {
/* we are done */
memset(c, 0, sizeof(struct connection));
continue;
}
/*
* rearm the event based on the state
* we are "stuck" at
*/
switch(c->state) {
case C_RECV_HEADER:
if (queue_mod_fd(qfd, c->fd,
QUEUE_EVENT_IN,
c) < 0) {
connection_reset(c);
break;
}
break;
case C_SEND_HEADER:
case C_SEND_BODY:
if (queue_mod_fd(qfd, c->fd,
QUEUE_EVENT_OUT,
c) < 0) {
connection_reset(c);
break;
}
break;
default:
break;
}
}
}
}
return NULL;
}
void
server_init_thread_pool(int insock, size_t nthreads, size_t nslots,
const struct server *srv)
{
pthread_t *thread = NULL;
struct worker_data *d = NULL;
size_t i;
/* allocate worker_data structs */
if (!(d = reallocarray(d, nthreads, sizeof(*d)))) {
die("reallocarray:");
}
for (i = 0; i < nthreads; i++) {
d[i].insock = insock;
d[i].nslots = nslots;
d[i].srv = srv;
}
/* allocate and initialize thread pool */
if (!(thread = reallocarray(thread, nthreads, sizeof(*thread)))) {
die("reallocarray:");
}
for (i = 0; i < nthreads; i++) {
if (pthread_create(&thread[i], NULL, server_worker, &d[i]) != 0) {
if (errno == EAGAIN) {
die("You need to run as root or have "
"CAP_SYS_RESOURCE set, or are trying "
"to create more threads than the "
"system can offer");
} else {
die("pthread_create:");
}
}
}
/* wait for threads */
for (i = 0; i < nthreads; i++) {
if ((errno = pthread_join(thread[i], NULL))) {
warn("pthread_join:");
}
}
}