diff options
Diffstat (limited to 'src/roof.c')
-rw-r--r-- | src/roof.c | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/src/roof.c b/src/roof.c new file mode 100644 index 0000000..505e3aa --- /dev/null +++ b/src/roof.c @@ -0,0 +1,133 @@ +void roof_init() { + hw_sched_init(); +} + +Roof *roof_new(UfoRoofConfig *cfg, GError **error) { + guint i; + GError *gerr = NULL; + + Roof *ctx = (Roof*)calloc(1, sizeof(Roof)); + if (!ctx) roof_new_error(error, "Can't allocate Roof context"); + + ctx->cfg = cfg; + + ctx->n_threads = cfg->n_streams / cfg->sockets_per_thread; + if (cfg->n_streams % cfg->sockets_per_thread) ctx->n_threads++; + + ctx->rdi = (RoofReadInterface**)calloc(cfg->n_streams, sizeof(RoofReadInterface*)); + ctx->rdc = (RoofReadContext**)calloc(cfg->n_streams, sizeof(RoofReadContext*)); + ctx->rdt = (RoofThreadContext**)calloc(ctx->n_threads, sizeof(RoofThreadContext*)); + ctx->sched = hw_sched_create(cfg->n_threads); + + if ((!ctx->rdi)||(!ctx->rdc)||(!ctx->rdt)||(!ctx->sched)) { + roof_free(ctx); + roof_setup_error(error, "Failed to allocate memory for various Roof contexts"); + } + + return ctx; +} + +void roof_configure_simulation(Roof *ctx, const gchar *path, guint first_file_number, GError **error) { + assert(ctx); + + ctx->simulate = 1; + ctx->path = path; + ctx->first_file_number = first_file_number; +} + +void roof_configure_stop_mode(Roof *ctx, const gulong max, GError **error) { + assert(ctx); + + ctx->max_datasets = max; +} + + +void roof_setup(Roof *ctx, GError **error) { + guint i; + GError *gerr = NULL; + + assert(ctx); + + RoofConfig *cfg = ctx->cfg; + +/* + ctx->buf = roof_buffer_new(cfg, 2, ctx->max_datasets, &gerr); + if ((gerr)||(!ctx->buf)) + roof_propagate_error(error, gerr, "roof_buffer_new: "); +*/ + + for (i = 0; i < cfg->n_streams; i++) { + if (ctx->simulate) { + if (!ctx->path) + roof_setup_error(error, "Path to simulated data should be specified"); + + ctx->rdi[i] = roof_read_file_new(cfg, ctx->path, ctx->first_file_number + i, &gerr); + } else { + ctx->rdi[i] = roof_read_socket_new(cfg, i, &gerr); + } + + if (!ctx->rdi[i]) + roof_propagate_error(error, gerr, "roof_read_interface_new: "); + + ctx->rdc[i] = roof_read_context_new(cfg, ctx->rdi[i], &gerr); + if (!ctx->rdc[i]) + roof_propagate_error(error, gerr, "roof_read_context_new: "); + } + + // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit + guint extra = 0, sockets_per_thread = cfg->n_streams / ctx->n_threads; + if (cfg->n_streams % ctx->n_threads) extra = cfg->n_streams - ctx->n_threads * sockets_per_thread; + + guint from, to; + for (i = 0; i < ctx->n_threads; i++) { + guint to = from + sockets_per_thread; + if (i < extra) to++; + + ctx->thr[i]= roof_thread_new(cfg, ctx, from, to, &gerr); + if (!ctx->thr[i]) roof_propagate_error(error, gerr, "roof_thread_new (%i): ", i); + } +} + + +void roof_free(Roof *ctx) { + guint i; + + if (ctx) { + RoofConfig *cfg = ctx->cfg; + if (ctx->sched) hw_sched_destroy(priv->sched); + + if (ctx->rdt) { + for (i = 0; i < ctx->n_threads; i++) + if (ctx->rdt[i]) roof_thread_context_free(ctx->rdt[i]); + free(ctx->rdt); + } + + if (ctx->rdc) { + for (i = 0; i < cfg->n_streams; i++) + if (ctx->rdc[i]) roof_read_context_free(ctx->rdc[i]); + free(ctx->rdc); + } + + if (ctx->rdi) { + for (i = 0; i < cfg->n_streams; i++) + if (ctx->rdi[i]) roof_read_interface_free(ctx->rdi[i]); + free(ctx->rdi); + } + + if (ctx->buf) roof_buffer_free(ctx->buf); + free(ctx); + } +} + + +void roof_read_dataset(Roof *ctx, void *buffer, GError **error) { + priv->current_dataset; + priv->current_buffer = buffer; + + err = hw_sched_schedule_thread_task(sched, (void*)ctx, roof_thread_read); + if (!err) err = hw_sched_wait_task(sched); + if (err) { fprintf(stderr, "Error %i scheduling init threads", err); exit(-1); } +} + + +} |