From 8e6850107adb5ca37ed0e9a4500282b2e8743294 Mon Sep 17 00:00:00 2001 From: Kevin Chabowski Date: Sat, 20 Jul 2013 14:01:01 +0200 Subject: Rough structure done. Parallel working should work. --- Makefile | 10 +-- config.c | 4 +- config.h | 2 +- example.ini | 2 +- mutex_helpers.c | 30 +++++++ mutex_helpers.h | 12 +++ nebula2.c | 268 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- render.c | 12 +++ render.h | 10 +++ statefile.c | 72 +++++++++++++++ statefile.h | 10 +++ 11 files changed, 418 insertions(+), 14 deletions(-) create mode 100644 mutex_helpers.c create mode 100644 mutex_helpers.h create mode 100644 render.c create mode 100644 render.h create mode 100644 statefile.c create mode 100644 statefile.h diff --git a/Makefile b/Makefile index d3c357f..238d359 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,9 @@ CC=gcc -CFLAGS=-Wall -Werror -pedantic -std=c99 -static -LIBS=-Liniparser -liniparser +CFLAGS=-Wall -Werror -pedantic +LIBS=-lpthread -nebula2: nebula2.o config.o iniparser/libiniparser.a - $(CC) $(CFLAGS) -o nebula2 nebula2.o config.o iniparser/libiniparser.a +nebula2: nebula2.o config.o render.o statefile.o mutex_helpers.o iniparser/libiniparser.a + $(CC) $(CFLAGS) $(LIBS) -o nebula2 nebula2.o config.o render.o statefile.o mutex_helpers.o iniparser/libiniparser.a iniparser/libiniparser.a: make -C iniparser libiniparser.a @@ -15,4 +15,4 @@ clean: rm -f *.o nuke: clean - rm -f nebula2 \ No newline at end of file + rm -f nebula2 diff --git a/config.c b/config.c index 032ea6a..72843e8 100644 --- a/config.c +++ b/config.c @@ -118,7 +118,7 @@ conf_load(char* path, config_t** conf) { (!conf_get_int(ini, "nebula2:height", &((*conf)->height))) || (!conf_get_int(ini, "nebula2:jobsize", &((*conf)->jobsize))) || (!conf_get_int(ini, "nebula2:jobs", &((*conf)->jobs))) || - (!conf_get_int(ini, "nebula2:procn", &((*conf)->procn)))) { + (!conf_get_int(ini, "nebula2:threads", &((*conf)->threads)))) { goto failed; } @@ -197,7 +197,7 @@ conf_print(config_t* conf) { printf("height: %d\n", conf->height); printf("jobsize: %d\n", conf->jobsize); printf("jobs: %d\n", conf->jobs); - printf("procn: %d\n", conf->procn); + printf("threads: %d\n", conf->threads); printf("statefile: %s\n", conf->statefile); printf("output: %s\n", conf->output); diff --git a/config.h b/config.h index 11923eb..9959c22 100644 --- a/config.h +++ b/config.h @@ -5,7 +5,7 @@ typedef struct { int width, height; - int jobsize, jobs, procn; + int jobsize, jobs, threads; char* statefile; char* output; diff --git a/example.ini b/example.ini index 79826a7..fe42a1f 100644 --- a/example.ini +++ b/example.ini @@ -4,7 +4,7 @@ height=600 jobsize=100000 jobs=10000 -procn=6 +threads=6 statefile=example.state diff --git a/mutex_helpers.c b/mutex_helpers.c new file mode 100644 index 0000000..e390316 --- /dev/null +++ b/mutex_helpers.c @@ -0,0 +1,30 @@ +#include +#include + +/* Creating a locked mutex */ +pthread_mutex_t* +mutex_create() { + pthread_mutex_t* mu; + + if(!(mu = malloc(sizeof(pthread_mutex_t)))) { + return NULL; + } + + if(pthread_mutex_init(mu, NULL) != 0) { + free(mu); + return NULL; + } + + pthread_mutex_trylock(mu); + + return mu; +} + +/* Destroy a mutex created by mutex_create */ +void +mutex_destroy(pthread_mutex_t* mu) { + if(mu) { + pthread_mutex_unlock(mu); + pthread_mutex_destroy(mu); + } +} diff --git a/mutex_helpers.h b/mutex_helpers.h new file mode 100644 index 0000000..0cdc7b6 --- /dev/null +++ b/mutex_helpers.h @@ -0,0 +1,12 @@ +#ifndef _nebula2_mutex_helpers_h_ +#define _nebula2_mutex_helpers_h_ + +#include + +/* Creating a locked mutex */ +extern pthread_mutex_t* mutex_create(); + +/* Destroy a mutex created by mutex_create */ +extern void mutex_destroy(pthread_mutex_t* mu); + +#endif diff --git a/nebula2.c b/nebula2.c index 480dfa2..9f0ae19 100644 --- a/nebula2.c +++ b/nebula2.c @@ -1,14 +1,276 @@ #include #include #include +#include +#include + +#include #include "config.h" +#include "statefile.h" +#include "render.h" +#include "mutex_helpers.h" void usage(void) { fputs("nebula2 needs the name of a config file as 1st argument.\n", stderr); } +/* Data that is shared between all processes. */ +typedef struct { + uint32_t* map; + uint32_t jobs_todo; + + pthread_mutex_t* jobrq_get_mu; + pthread_mutex_t* jobrq_set_mu; + int jobrq; +} nebula_data_t; + +/* Set the jobrq variable (e.g. request a new job) */ +void +jobrq_set(nebula_data_t* nd, int val) { + pthread_mutex_lock(nd->jobrq_set_mu); + nd->jobrq = val; + pthread_mutex_unlock(nd->jobrq_get_mu); +} + +/* Get the next jobrq value */ +int +jobrq_get(nebula_data_t* nd) { + int rv; + + pthread_mutex_lock(nd->jobrq_get_mu); + rv = nd->jobrq; + pthread_mutex_unlock(nd->jobrq_set_mu); + return rv; +} + +nebula_data_t* +nebula_data_create(config_t* conf) { + size_t mapsize; + nebula_data_t* nd = NULL; + + if(!(nd = malloc(sizeof(nebula_data_t)))) { + return NULL; + } + + mapsize = conf->width * conf->height * conf->iters_n; + + nd->map = NULL; + nd->jobrq_set_mu = NULL; + nd->jobrq_get_mu = NULL; + + if(!(nd->jobrq_get_mu = mutex_create())) { + fputs("Could not init jobrq_get_mu mutex.\n", stderr); + goto failed; + } + if(!(nd->jobrq_set_mu = mutex_create())) { + fputs("Could not init jobrq_set_mu mutex.\n", stderr); + goto failed; + } + /* The set mutex needs to be initially unlocked, so one worker can start requesting jobs. */ + pthread_mutex_unlock(nd->jobrq_set_mu); + + if(!(nd->map = malloc(sizeof(uint32_t) * mapsize))) { + fputs("Could not allocate memory for map.\n", stderr); + goto failed; + } + + return nd; + +failed: + if(nd->jobrq_set_mu) { + mutex_destroy(nd->jobrq_set_mu); + } + if(nd->jobrq_get_mu) { + mutex_destroy(nd->jobrq_get_mu); + } + if(nd->map) { + free(nd->map); + } + return NULL; +} + +void +nebula_data_destroy(nebula_data_t* nd) { + if(nd->map) { + free(nd->map); + } + mutex_destroy(nd->jobrq_set_mu); + mutex_destroy(nd->jobrq_get_mu); + free(nd); +} + +/* Data of a single worker */ +typedef struct { + int id; + pthread_mutex_t* mu; + int ok; + + config_t* conf; + nebula_data_t* nd; + + pthread_t thread; + int thread_started; +} worker_data_t; + +/* The background worker */ +void* +worker(void* _wd) { + worker_data_t* wd = _wd; + nebula_data_t* nd = wd->nd; + + for(;; ) { + jobrq_set(nd, wd->id); + pthread_mutex_lock(wd->mu); + if(!wd->ok) { + return NULL; + } + /* TODO: Do magic... */ + } +} + +/* Init and run a worker */ +int +worker_init(worker_data_t* wd, int id, config_t* conf, nebula_data_t* nd) { + wd->id = id; + wd->ok = 1; + wd->conf = conf; + wd->nd = nd; + wd->thread_started = 0; + + if(!(wd->mu = mutex_create())) { + return 0; + } + + if(pthread_create(&(wd->thread), NULL, worker, wd) != 0) { + mutex_destroy(wd->mu); + return 0; + } + + wd->thread_started = 1; + return 1; +} + +void +worker_cleanup(worker_data_t* wd) { + if(wd->thread_started) { + pthread_join(wd->thread, NULL); + } + mutex_destroy(wd->mu); +} + +/* Global reference to shared data (needed by sighandler) */ +nebula_data_t* global_nd; + +void +sighandler(int sig) { + switch(sig) { + case SIGINT: + jobrq_set(global_nd, -1); + break; + case SIGUSR1: + printf("Jobs todo: %d\n", global_nd->jobs_todo); + break; + } +} + +int +setup_sighandler(void) { + struct sigaction action; + action.sa_handler = sighandler; + action.sa_flags = SA_RESTART; + sigemptyset(&action.sa_mask); + + if(sigaction(SIGINT, &action, NULL) != 0) { + return 0; + } + if(sigaction(SIGUSR1, &action, NULL) != 0) { + return 0; + } + return 1; +} + +int +nebula2(config_t* conf) { + int rv = 1; + nebula_data_t* nd = NULL; + uint32_t jobs_done; + worker_data_t* workers; + int i; + int rq; + int workers_alive = 0; + + if(!(nd = nebula_data_create(conf))) { + goto tidyup; + } + global_nd = nd; + + if(!state_load(conf, nd->map, &jobs_done)) { + fprintf(stderr, "Error while loading state: %s\n", strerror(errno)); + goto tidyup; + } + nd->jobs_todo = conf->jobs - jobs_done; + + if(!(workers = calloc(conf->procn, sizeof(worker_data_t)))) { + fputs("Could not allocate memory for worker data.\n", stderr); + goto tidyup; + } + for(i = 0; i < conf->procn; i++) { + if(!(worker_init(&(workers[i]), i, conf, nd))) { + fputs("Could not init worker.\n", stderr); + goto tidyup; + } + workers_alive++; + } + + if(!setup_sighandler()) { + fprintf(stderr, "Error while configuring signals: %s\n", strerror(errno)); + goto tidyup; + } + + /* We need to manually unlock the set mutex once, so the first worker is allowed to set jobrq. */ + while(nd->jobs_todo > 0) { + rq = jobrq_get(nd); + if(rq < 0) { + break; + } + + workers[rq].ok = 1; + (nd->jobs_todo)--; + pthread_mutex_unlock(workers[rq].mu); + } + + if(!(state_save(conf, nd->map, conf->jobs - nd->jobs_todo))) { + fprintf(stderr, "Error while saving state: %s\n", strerror(errno)); + goto tidyup; + } + + rv = render(conf, nd->map) ? 0 : 1; + +tidyup: + while(workers_alive > 0) { + rq = jobrq_get(nd); + if(rq >= 0) { + workers[rq].ok = 0; + pthread_mutex_unlock(workers[rq].mu); + workers_alive--; + } + } + + if(nd) { + nebula_data_destroy(nd); + } + + if(workers) { + for(i = 0; i < conf->procn; i++) { + worker_cleanup(&(workers[i])); + } + free(workers); + } + return rv; +} + int main(int argc, char** argv) { int rv = 1; @@ -23,11 +285,7 @@ main(int argc, char** argv) { goto tidyup; } - conf_print(conf); - - /*nebula2(conf);*/ - - rv = 0; /* All OK, we return with no error. */ + rv = nebula2(conf); tidyup: if(conf) { conf_destroy(conf); diff --git a/render.c b/render.c new file mode 100644 index 0000000..b64aa25 --- /dev/null +++ b/render.c @@ -0,0 +1,12 @@ +#include +#include +#include + +#include "config.h" +#include "color.h" + +int +render(config_t* conf, uint32_t* map) { + /* TODO */ + return 1; +} diff --git a/render.h b/render.h new file mode 100644 index 0000000..f903bb1 --- /dev/null +++ b/render.h @@ -0,0 +1,10 @@ +#ifndef _nebula2_render_h_ +#define _nebula2_render_h_ + +#include + +#include "config.h" + +int render(config_t* conf, uint32_t* map); + +#endif diff --git a/statefile.c b/statefile.c new file mode 100644 index 0000000..08fb372 --- /dev/null +++ b/statefile.c @@ -0,0 +1,72 @@ +#include +#include +#include +#include + +#include "config.h" + +int +state_load(config_t* conf, uint32_t* map, uint32_t* jobs_done) { + FILE* fh = NULL; + size_t mapsize; + int errsv; + + mapsize = conf->width * conf->height * conf->iters_n; + + if(!(fh = fopen(conf->statefile, "rb"))) { + if(errno == ENOENT) { + memset(map, 0, mapsize); + *jobs_done = 0; + return 1; + } + + return 0; + } + + if(fread(jobs_done, sizeof(uint32_t), 1, fh) != 1) { + errsv = errno; + fclose(fh); + errno = errsv; + return 0; + } + + if(fread(map, sizeof(uint32_t), mapsize, fh) != mapsize) { + errsv = errno; + fclose(fh); + errno = errsv; + return 0; + } + + fclose(fh); + return 1; +} + +int +state_save(config_t* conf, uint32_t* map, uint32_t jobs_done) { + FILE* fh = NULL; + size_t mapsize; + int errsv; + + mapsize = conf->width * conf->height * conf->iters_n; + + if(!(fh = fopen(conf->statefile, "wb"))) { + return 0; + } + + if(fwrite(&jobs_done, sizeof(uint32_t), 1, fh) != 1) { + errsv = errno; + fclose(fh); + errno = errsv; + return 0; + } + + if(fwrite(map, sizeof(uint32_t), mapsize, fh) != mapsize) { + errsv = errno; + fclose(fh); + errno = errsv; + return 0; + } + + fclose(fh); + return 1; +} diff --git a/statefile.h b/statefile.h new file mode 100644 index 0000000..bdcb32f --- /dev/null +++ b/statefile.h @@ -0,0 +1,10 @@ +#ifndef _nebula2_statefile_h_ +#define _nebula2_statefile_h_ + +#include +#include "config.h" + +extern int state_load(config_t* conf, uint32_t* map, uint32_t* jobs_done); +extern int state_save(config_t* conf, uint32_t* map, uint32_t jobs_done); + +#endif -- cgit v1.2.3-54-g00ecf