From 8e6850107adb5ca37ed0e9a4500282b2e8743294 Mon Sep 17 00:00:00 2001 From: Kevin Chabowski Date: Sat, 20 Jul 2013 14:01:01 +0200 Subject: Rough structure done. Parallel working should work. --- nebula2.c | 268 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 263 insertions(+), 5 deletions(-) (limited to 'nebula2.c') diff --git a/nebula2.c b/nebula2.c index 480dfa2..9f0ae19 100644 --- a/nebula2.c +++ b/nebula2.c @@ -1,14 +1,276 @@ #include #include #include +#include +#include + +#include #include "config.h" +#include "statefile.h" +#include "render.h" +#include "mutex_helpers.h" void usage(void) { fputs("nebula2 needs the name of a config file as 1st argument.\n", stderr); } +/* Data that is shared between all processes. */ +typedef struct { + uint32_t* map; + uint32_t jobs_todo; + + pthread_mutex_t* jobrq_get_mu; + pthread_mutex_t* jobrq_set_mu; + int jobrq; +} nebula_data_t; + +/* Set the jobrq variable (e.g. request a new job) */ +void +jobrq_set(nebula_data_t* nd, int val) { + pthread_mutex_lock(nd->jobrq_set_mu); + nd->jobrq = val; + pthread_mutex_unlock(nd->jobrq_get_mu); +} + +/* Get the next jobrq value */ +int +jobrq_get(nebula_data_t* nd) { + int rv; + + pthread_mutex_lock(nd->jobrq_get_mu); + rv = nd->jobrq; + pthread_mutex_unlock(nd->jobrq_set_mu); + return rv; +} + +nebula_data_t* +nebula_data_create(config_t* conf) { + size_t mapsize; + nebula_data_t* nd = NULL; + + if(!(nd = malloc(sizeof(nebula_data_t)))) { + return NULL; + } + + mapsize = conf->width * conf->height * conf->iters_n; + + nd->map = NULL; + nd->jobrq_set_mu = NULL; + nd->jobrq_get_mu = NULL; + + if(!(nd->jobrq_get_mu = mutex_create())) { + fputs("Could not init jobrq_get_mu mutex.\n", stderr); + goto failed; + } + if(!(nd->jobrq_set_mu = mutex_create())) { + fputs("Could not init jobrq_set_mu mutex.\n", stderr); + goto failed; + } + /* The set mutex needs to be initially unlocked, so one worker can start requesting jobs. */ + pthread_mutex_unlock(nd->jobrq_set_mu); + + if(!(nd->map = malloc(sizeof(uint32_t) * mapsize))) { + fputs("Could not allocate memory for map.\n", stderr); + goto failed; + } + + return nd; + +failed: + if(nd->jobrq_set_mu) { + mutex_destroy(nd->jobrq_set_mu); + } + if(nd->jobrq_get_mu) { + mutex_destroy(nd->jobrq_get_mu); + } + if(nd->map) { + free(nd->map); + } + return NULL; +} + +void +nebula_data_destroy(nebula_data_t* nd) { + if(nd->map) { + free(nd->map); + } + mutex_destroy(nd->jobrq_set_mu); + mutex_destroy(nd->jobrq_get_mu); + free(nd); +} + +/* Data of a single worker */ +typedef struct { + int id; + pthread_mutex_t* mu; + int ok; + + config_t* conf; + nebula_data_t* nd; + + pthread_t thread; + int thread_started; +} worker_data_t; + +/* The background worker */ +void* +worker(void* _wd) { + worker_data_t* wd = _wd; + nebula_data_t* nd = wd->nd; + + for(;; ) { + jobrq_set(nd, wd->id); + pthread_mutex_lock(wd->mu); + if(!wd->ok) { + return NULL; + } + /* TODO: Do magic... */ + } +} + +/* Init and run a worker */ +int +worker_init(worker_data_t* wd, int id, config_t* conf, nebula_data_t* nd) { + wd->id = id; + wd->ok = 1; + wd->conf = conf; + wd->nd = nd; + wd->thread_started = 0; + + if(!(wd->mu = mutex_create())) { + return 0; + } + + if(pthread_create(&(wd->thread), NULL, worker, wd) != 0) { + mutex_destroy(wd->mu); + return 0; + } + + wd->thread_started = 1; + return 1; +} + +void +worker_cleanup(worker_data_t* wd) { + if(wd->thread_started) { + pthread_join(wd->thread, NULL); + } + mutex_destroy(wd->mu); +} + +/* Global reference to shared data (needed by sighandler) */ +nebula_data_t* global_nd; + +void +sighandler(int sig) { + switch(sig) { + case SIGINT: + jobrq_set(global_nd, -1); + break; + case SIGUSR1: + printf("Jobs todo: %d\n", global_nd->jobs_todo); + break; + } +} + +int +setup_sighandler(void) { + struct sigaction action; + action.sa_handler = sighandler; + action.sa_flags = SA_RESTART; + sigemptyset(&action.sa_mask); + + if(sigaction(SIGINT, &action, NULL) != 0) { + return 0; + } + if(sigaction(SIGUSR1, &action, NULL) != 0) { + return 0; + } + return 1; +} + +int +nebula2(config_t* conf) { + int rv = 1; + nebula_data_t* nd = NULL; + uint32_t jobs_done; + worker_data_t* workers; + int i; + int rq; + int workers_alive = 0; + + if(!(nd = nebula_data_create(conf))) { + goto tidyup; + } + global_nd = nd; + + if(!state_load(conf, nd->map, &jobs_done)) { + fprintf(stderr, "Error while loading state: %s\n", strerror(errno)); + goto tidyup; + } + nd->jobs_todo = conf->jobs - jobs_done; + + if(!(workers = calloc(conf->procn, sizeof(worker_data_t)))) { + fputs("Could not allocate memory for worker data.\n", stderr); + goto tidyup; + } + for(i = 0; i < conf->procn; i++) { + if(!(worker_init(&(workers[i]), i, conf, nd))) { + fputs("Could not init worker.\n", stderr); + goto tidyup; + } + workers_alive++; + } + + if(!setup_sighandler()) { + fprintf(stderr, "Error while configuring signals: %s\n", strerror(errno)); + goto tidyup; + } + + /* We need to manually unlock the set mutex once, so the first worker is allowed to set jobrq. */ + while(nd->jobs_todo > 0) { + rq = jobrq_get(nd); + if(rq < 0) { + break; + } + + workers[rq].ok = 1; + (nd->jobs_todo)--; + pthread_mutex_unlock(workers[rq].mu); + } + + if(!(state_save(conf, nd->map, conf->jobs - nd->jobs_todo))) { + fprintf(stderr, "Error while saving state: %s\n", strerror(errno)); + goto tidyup; + } + + rv = render(conf, nd->map) ? 0 : 1; + +tidyup: + while(workers_alive > 0) { + rq = jobrq_get(nd); + if(rq >= 0) { + workers[rq].ok = 0; + pthread_mutex_unlock(workers[rq].mu); + workers_alive--; + } + } + + if(nd) { + nebula_data_destroy(nd); + } + + if(workers) { + for(i = 0; i < conf->procn; i++) { + worker_cleanup(&(workers[i])); + } + free(workers); + } + return rv; +} + int main(int argc, char** argv) { int rv = 1; @@ -23,11 +285,7 @@ main(int argc, char** argv) { goto tidyup; } - conf_print(conf); - - /*nebula2(conf);*/ - - rv = 0; /* All OK, we return with no error. */ + rv = nebula2(conf); tidyup: if(conf) { conf_destroy(conf); -- cgit v1.2.3-54-g00ecf