diff options
author | Suren A. Chilingaryan <csa@suren.me> | 2020-09-03 03:00:30 +0200 |
---|---|---|
committer | Suren A. Chilingaryan <csa@suren.me> | 2020-09-03 03:00:30 +0200 |
commit | 5172421d248250b4ab3b69eb57fd83656e23a4da (patch) | |
tree | a499d9f1dd0b74b754816884a59927b3171656fc /src | |
parent | 7b2e6168b049be9e7852b2d364d897592eff69fc (diff) | |
download | ufo-roof-temp-master.tar.gz ufo-roof-temp-master.tar.bz2 ufo-roof-temp-master.tar.xz ufo-roof-temp-master.zip |
Diffstat (limited to 'src')
-rw-r--r-- | src/CMakeLists.txt | 10 | ||||
-rw-r--r-- | src/hw_config.h | 8 | ||||
-rw-r--r-- | src/hw_sched.c | 398 | ||||
-rw-r--r-- | src/hw_sched.h | 146 | ||||
-rw-r--r-- | src/hw_thread.c | 143 | ||||
-rw-r--r-- | src/hw_thread.h | 76 | ||||
-rw-r--r-- | src/meson.build | 7 | ||||
-rw-r--r-- | src/roof-buffer.c (renamed from src/ufo-roof-buffer.c) | 85 | ||||
-rw-r--r-- | src/roof-buffer.h (renamed from src/ufo-roof-buffer.h) | 29 | ||||
-rw-r--r-- | src/roof-config.c (renamed from src/ufo-roof-config.c) | 52 | ||||
-rw-r--r-- | src/roof-config.h (renamed from src/ufo-roof-config.h) | 20 | ||||
-rw-r--r-- | src/roof-error.h (renamed from src/ufo-roof-error.h) | 9 | ||||
-rw-r--r-- | src/roof-read-file.c (renamed from src/ufo-roof-read-file.c) | 56 | ||||
-rw-r--r-- | src/roof-read-file.h | 8 | ||||
-rw-r--r-- | src/roof-read-socket.c (renamed from src/ufo-roof-read-socket.c) | 89 | ||||
-rw-r--r-- | src/roof-read-socket.h | 8 | ||||
-rw-r--r-- | src/roof-read.c | 107 | ||||
-rw-r--r-- | src/roof-read.h | 45 | ||||
-rw-r--r-- | src/roof-thread.c | 95 | ||||
-rw-r--r-- | src/roof-thread.h | 30 | ||||
-rw-r--r-- | src/roof.c | 133 | ||||
-rw-r--r-- | src/roof.h | 46 | ||||
-rw-r--r-- | src/save/memcpy.c | 344 | ||||
-rw-r--r-- | src/save/memcpy.h | 63 | ||||
-rw-r--r-- | src/save/ufo-roof-buffer-build-task.c | 474 | ||||
-rw-r--r-- | src/save/ufo-roof-read-thread.c | 155 | ||||
-rw-r--r-- | src/save/ufo-roof-read-thread.h | 23 | ||||
-rw-r--r-- | src/save/ufo-roof-read.c | 123 | ||||
-rw-r--r-- | src/save/ufo-roof-read.h | 61 | ||||
-rw-r--r-- | src/ufo-roof-build-task.c | 271 | ||||
-rw-r--r-- | src/ufo-roof-read-file.h | 8 | ||||
-rw-r--r-- | src/ufo-roof-read-socket.h | 8 | ||||
-rw-r--r-- | src/ufo-roof-read-task.c | 68 | ||||
-rw-r--r-- | src/ufo-roof-read.h | 17 | ||||
-rw-r--r-- | src/ufo-roof.h | 16 |
35 files changed, 2947 insertions, 284 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 720c18f..837a6fb 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,7 +2,6 @@ cmake_minimum_required(VERSION 2.6) #{{{ Sources set(ufofilter_SRCS - ufo-roof-read-task.c ufo-roof-build-task.c ufo-roof-filter-task.c ufo-roof-flat-field-correct-task.c @@ -12,12 +11,11 @@ set(common_SRCS ufo-roof-config.c ) -set(roof_read_aux_SRCS +set(roof_build_aux_SRCS + hw_sched.c + hw_thread.c ufo-roof-read-socket.c ufo-roof-read-file.c - ) - -set(roof_build_aux_SRCS ufo-roof-buffer.c ) @@ -30,7 +28,7 @@ set(ufofilter_LIBS ${UFO_LIBRARIES} ${OpenCL_LIBRARIES}) -set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=gnu18 -pedantic -Wall -Wextra -fPIC -Wno-unused-parameter -Wno-deprecated-declarations") +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=gnu18 -pedantic -Wall -Wextra -fPIC -Wno-unused-parameter -Wno-deprecated-declarations -g -gdwarf-2 -g3 -fno-omit-frame-pointer") add_definitions(-D_FILE_OFFSET_BITS=64 -D_LARGE_FILES) #}}} diff --git a/src/hw_config.h b/src/hw_config.h new file mode 100644 index 0000000..2351dea --- /dev/null +++ b/src/hw_config.h @@ -0,0 +1,8 @@ +#ifndef _HW_CONFIG_H +#define _HW_CONFIG_H + + // enable threading +#define HW_HAVE_SCHED_HEADERS +#define HW_USE_THREADS + +#endif diff --git a/src/hw_sched.c b/src/hw_sched.c new file mode 100644 index 0000000..ec4d812 --- /dev/null +++ b/src/hw_sched.c @@ -0,0 +1,398 @@ +/* + * The PyHST program is Copyright (C) 2002-2011 of the + * European Synchrotron Radiation Facility (ESRF) and + * Karlsruhe Institute of Technology (KIT). + * + * PyHST is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * hst is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#define _GNU_SOURCE +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "hw_config.h" + +#ifdef HW_HAVE_SCHED_HEADERS +# include <sys/types.h> +# include <unistd.h> +# include <sched.h> +#endif /* HW_HAVE_SCHED_HEADERS */ + +#include "hw_sched.h" + + +#ifdef HW_USE_THREADS +# define MUTEX_INIT(ctx, name) \ + if (!err) { \ + ctx->name##_mutex = g_mutex_new(); \ + if (!ctx->name##_mutex) err = 1; \ + } + +# define MUTEX_FREE(ctx, name) \ + if (ctx->name##_mutex) g_mutex_free(ctx->name##_mutex); + +# define COND_INIT(ctx, name) \ + MUTEX_INIT(ctx, name##_cond) \ + if (!err) { \ + ctx->name##_cond = g_cond_new(); \ + if (!ctx->name##_cond) { \ + err = 1; \ + MUTEX_FREE(ctx, name##_cond) \ + } \ + } + +# define COND_FREE(ctx, name) \ + if (ctx->name##_cond) g_cond_free(ctx->name##_cond); \ + MUTEX_FREE(ctx, name##_cond) +#else /* HW_USE_THREADS */ +# define MUTEX_INIT(ctx, name) +# define MUTEX_FREE(ctx, name) +# define COND_INIT(ctx, name) +# define COND_FREE(ctx, name) +#endif /* HW_USE_THREADS */ + + +HWRunFunction ppu_run[] = { + (HWRunFunction)NULL +}; + +static int hw_sched_initialized = 0; + +int hw_sched_init(void) { + if (!hw_sched_initialized) { +#ifdef HW_USE_THREADS + g_thread_init(NULL); +#endif /* HW_USE_THREADS */ + hw_sched_initialized = 1; + } + + return 0; +} + + +int hw_sched_get_cpu_count(void) { +#ifdef HW_HAVE_SCHED_HEADERS + int err; + + int cpu_count; + cpu_set_t mask; + + err = sched_getaffinity(getpid(), sizeof(mask), &mask); + if (err) return 1; + +# ifdef CPU_COUNT + cpu_count = CPU_COUNT(&mask); +# else + for (cpu_count = 0; cpu_count < CPU_SETSIZE; cpu_count++) { + if (!CPU_ISSET(cpu_count, &mask)) break; + } +# endif + + if (!cpu_count) cpu_count = 1; + return cpu_count; +#else /* HW_HAVE_SCHED_HEADERS */ + return 1; +#endif /* HW_HAVE_SCHED_HEADERS */ +} + + +HWSched hw_sched_create(int cpu_count) { + int i; + int err = 0; + + HWSched ctx; + + //hw_sched_init(); + + ctx = (HWSched)malloc(sizeof(HWSchedS)); + if (!ctx) return NULL; + + memset(ctx, 0, sizeof(HWSchedS)); + + ctx->status = 1; + + MUTEX_INIT(ctx, sync); + MUTEX_INIT(ctx, data); + COND_INIT(ctx, compl); + COND_INIT(ctx, job); + + if (err) { + hw_sched_destroy(ctx); + return NULL; + } + + if (!cpu_count) cpu_count = hw_sched_get_cpu_count(); + if (cpu_count > HW_MAX_THREADS) cpu_count = HW_MAX_THREADS; + + ctx->n_threads = 0; + for (i = 0; i < cpu_count; i++) { + ctx->thread[ctx->n_threads] = hw_thread_create(ctx, ctx->n_threads, NULL, ppu_run, NULL); + if (ctx->thread[ctx->n_threads]) { +#ifndef HW_USE_THREADS + ctx->thread[ctx->n_threads]->status = HW_THREAD_STATUS_STARTING; +#endif /* HW_USE_THREADS */ + ++ctx->n_threads; + } + } + + if (!ctx->n_threads) { + hw_sched_destroy(ctx); + return NULL; + } + + return ctx; +} + +static int hw_sched_wait_threads(HWSched ctx) { +#ifdef HW_USE_THREADS + int i = 0; + + hw_sched_lock(ctx, compl_cond); + while (i < ctx->n_threads) { + for (; i < ctx->n_threads; i++) { + if (ctx->thread[i]->status == HW_THREAD_STATUS_INIT) { + hw_sched_wait(ctx, compl); + break; + } + } + + } + hw_sched_unlock(ctx, compl_cond); +#endif /* HW_USE_THREADS */ + + ctx->started = 1; + + return 0; +} + +void hw_sched_destroy(HWSched ctx) { + int i; + + if (ctx->n_threads > 0) { + if (!ctx->started) { + hw_sched_wait_threads(ctx); + } + + ctx->status = 0; + hw_sched_lock(ctx, job_cond); + hw_sched_broadcast(ctx, job); + hw_sched_unlock(ctx, job_cond); + + for (i = 0; i < ctx->n_threads; i++) { + hw_thread_destroy(ctx->thread[i]); + } + } + + COND_FREE(ctx, job); + COND_FREE(ctx, compl); + MUTEX_FREE(ctx, data); + MUTEX_FREE(ctx, sync); + + free(ctx); +} + +int hw_sched_set_sequential_mode(HWSched ctx, int *n_blocks, int *cur_block, HWSchedFlags flags) { + ctx->mode = HW_SCHED_MODE_SEQUENTIAL; + ctx->n_blocks = n_blocks; + ctx->cur_block = cur_block; + ctx->flags = flags; + + return 0; +} + +int hw_sched_get_chunk(HWSched ctx, int thread_id) { + int block; + + switch (ctx->mode) { + case HW_SCHED_MODE_PREALLOCATED: + if (ctx->thread[thread_id]->status == HW_THREAD_STATUS_STARTING) { +#ifndef HW_USE_THREADS + ctx->thread[thread_id]->status = HW_THREAD_STATUS_DONE; +#endif /* HW_USE_THREADS */ + return thread_id; + } else { + return HW_SCHED_CHUNK_INVALID; + } + case HW_SCHED_MODE_SEQUENTIAL: + if ((ctx->flags&HW_SCHED_FLAG_INIT_CALL)&&(ctx->thread[thread_id]->status == HW_THREAD_STATUS_STARTING)) { + return HW_SCHED_CHUNK_INIT; + } + hw_sched_lock(ctx, data); + block = *ctx->cur_block; + if (block < *ctx->n_blocks) { + *ctx->cur_block = *ctx->cur_block + 1; + } else { + block = HW_SCHED_CHUNK_INVALID; + } + hw_sched_unlock(ctx, data); + if (block == HW_SCHED_CHUNK_INVALID) { + if (((ctx->flags&HW_SCHED_FLAG_FREE_CALL)&&(ctx->thread[thread_id]->status == HW_THREAD_STATUS_RUNNING))) { + ctx->thread[thread_id]->status = HW_THREAD_STATUS_FINISHING; + return HW_SCHED_CHUNK_FREE; + } + if ((ctx->flags&HW_SCHED_FLAG_TERMINATOR_CALL)&&((ctx->thread[thread_id]->status == HW_THREAD_STATUS_RUNNING)||(ctx->thread[thread_id]->status == HW_THREAD_STATUS_FINISHING))) { + int i; + hw_sched_lock(ctx, data); + for (i = 0; i < ctx->n_threads; i++) { + if (thread_id == i) continue; + if ((ctx->thread[i]->status != HW_THREAD_STATUS_DONE)&&(ctx->thread[i]->status != HW_THREAD_STATUS_FINISHING2)&&(ctx->thread[i]->status != HW_THREAD_STATUS_IDLE)) { + break; + } + } + ctx->thread[thread_id]->status = HW_THREAD_STATUS_FINISHING2; + hw_sched_unlock(ctx, data); + if (i == ctx->n_threads) { + return HW_SCHED_CHUNK_TERMINATOR; + } + } + } + return block; + default: + return HW_SCHED_CHUNK_INVALID; + } + + return -1; +} + + +int hw_sched_schedule_task(HWSched ctx, void *appctx, HWEntry entry) { +#ifdef HW_USE_THREADS + if (!ctx->started) { + hw_sched_wait_threads(ctx); + } +#else /* HW_USE_THREADS */ + int err; + int i, chunk_id, n_threads; + HWRunFunction run; + HWThread thrctx; +#endif /* HW_USE_THREADS */ + + ctx->ctx = appctx; + ctx->entry = entry; + + switch (ctx->mode) { + case HW_SCHED_MODE_SEQUENTIAL: + *ctx->cur_block = 0; + break; + default: + ; + } + +#ifdef HW_USE_THREADS + hw_sched_lock(ctx, compl_cond); + + hw_sched_lock(ctx, job_cond); + hw_sched_broadcast(ctx, job); + hw_sched_unlock(ctx, job_cond); +#else /* HW_USE_THREADS */ + n_threads = ctx->n_threads; + + for (i = 0; i < n_threads; i++) { + thrctx = ctx->thread[i]; + thrctx->err = 0; + } + + i = 0; + thrctx = ctx->thread[i]; + chunk_id = hw_sched_get_chunk(ctx, thrctx->thread_id); + + while (chunk_id >= 0) { + run = hw_run_entry(thrctx->runs, entry); + err = run(thrctx, thrctx->hwctx, chunk_id, appctx); + if (err) { + thrctx->err = err; + break; + } + + if ((++i) == n_threads) i = 0; + thrctx = ctx->thread[i]; + chunk_id = hw_sched_get_chunk(ctx, thrctx->thread_id); + } +#endif /* HW_USE_THREADS */ + + return 0; +} + +int hw_sched_wait_task(HWSched ctx) { + int err = 0; + int i = 0, n_threads = ctx->n_threads; + +#ifdef HW_USE_THREADS + while (i < ctx->n_threads) { + for (; i < ctx->n_threads; i++) { + if (ctx->thread[i]->status == HW_THREAD_STATUS_DONE) { + ctx->thread[i]->status = HW_THREAD_STATUS_IDLE; + } else { + hw_sched_wait(ctx, compl); + break; + } + } + + } + + hw_sched_unlock(ctx, compl_cond); +#endif /* HW_USE_THREADS */ + + for (i = 0; i < n_threads; i++) { + HWThread thrctx = ctx->thread[i]; + if (thrctx->err) return err = thrctx->err; + +#ifndef HW_USE_THREADS + thrctx->status = HW_THREAD_STATUS_IDLE; +#endif /* HW_USE_THREADS */ + } + + return err; +} + +int hw_sched_execute_task(HWSched ctx, void *appctx, HWEntry entry) { + int err; + + err = hw_sched_schedule_task(ctx, appctx, entry); + if (err) return err; + + return hw_sched_wait_task(ctx); +} + +int hw_sched_schedule_thread_task(HWSched ctx, void *appctx, HWEntry entry) { + int err; + + ctx->saved_mode = ctx->mode; + ctx->mode = HW_SCHED_MODE_PREALLOCATED; + err = hw_sched_schedule_task(ctx, appctx, entry); + + return err; +} + + +int hw_sched_wait_thread_task(HWSched ctx) { + int err; + + err = hw_sched_wait_task(ctx); + ctx->mode = ctx->saved_mode; + + return err; +} + +int hw_sched_execute_thread_task(HWSched ctx, void *appctx, HWEntry entry) { + int err; + int saved_mode = ctx->mode; + + ctx->mode = HW_SCHED_MODE_PREALLOCATED; + err = hw_sched_execute_task(ctx, appctx, entry); + ctx->mode = saved_mode; + + return err; +} diff --git a/src/hw_sched.h b/src/hw_sched.h new file mode 100644 index 0000000..af9e363 --- /dev/null +++ b/src/hw_sched.h @@ -0,0 +1,146 @@ +/* + * The PyHST program is Copyright (C) 2002-2011 of the + * European Synchrotron Radiation Facility (ESRF) and + * Karlsruhe Institute of Technology (KIT). + * + * PyHST is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * hst is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef _HW_SCHED_H +#define _HW_SCHED_H + +#include <glib.h> + +typedef struct HWSchedT *HWSched; +#ifdef HW_USE_THREADS +typedef GMutex *HWMutex; +#else /* HW_USE_THREADS */ +typedef void *HWMutex; +#endif /* HW_USE_THREADS */ + + +#include "hw_thread.h" + +enum HWSchedModeT { + HW_SCHED_MODE_PREALLOCATED = 0, + HW_SCHED_MODE_SEQUENTIAL +}; +typedef enum HWSchedModeT HWSchedMode; + +enum HWSchedChunkT { + HW_SCHED_CHUNK_INVALID = -1, + HW_SCHED_CHUNK_INIT = -2, + HW_SCHED_CHUNK_FREE = -3, + HW_SCHED_CHUNK_TERMINATOR = -4 +}; +typedef enum HWSchedChunkT HWSchedChunk; + +enum HWSchedFlagsT { + HW_SCHED_FLAG_INIT_CALL = 1, //! Executed in each thread before real chunks + HW_SCHED_FLAG_FREE_CALL = 2, //! Executed in each thread after real chunks + HW_SCHED_FLAG_TERMINATOR_CALL = 4 //! Executed in one of the threads after all threads are done +}; +typedef enum HWSchedFlagsT HWSchedFlags; + + +#define HW_SINGLE_MODE +//#define HW_DETECT_CPU_CORES +#define HW_MAX_THREADS 128 + +#ifdef HW_SINGLE_MODE + typedef HWRunFunction HWEntry; +# define hw_run_entry(runs, entry) entry +#else /* HW_SINGLE_MODE */ + typedef int HWEntry; +# define hw_run_entry(runs, entry) runs[entry] +#endif /* HW_SINGLE_MODE */ + +#ifndef HW_HIDE_DETAILS +struct HWSchedT { + int status; + int started; + + int n_threads; + HWThread thread[HW_MAX_THREADS]; + +#ifdef HW_USE_THREADS + GCond *job_cond, *compl_cond; + GMutex *job_cond_mutex, *compl_cond_mutex, *data_mutex; + GMutex *sync_mutex; +#endif /* HW_USE_THREADS */ + + HWSchedMode mode; + HWSchedMode saved_mode; + HWSchedFlags flags; + int *n_blocks; + int *cur_block; + + HWEntry entry; + void *ctx; +}; +typedef struct HWSchedT HWSchedS; +#endif /* HW_HIDE_DETAILS */ + +# ifdef __cplusplus +extern "C" { +# endif + +HWSched hw_sched_create(int ppu_count); +int hw_sched_init(void); +void hw_sched_destroy(HWSched ctx); +int hw_sched_get_cpu_count(void); + +int hw_sched_set_sequential_mode(HWSched ctx, int *n_blocks, int *cur_block, HWSchedFlags flags); +int hw_sched_get_chunk(HWSched ctx, int thread_id); +int hw_sched_schedule_task(HWSched ctx, void *appctx, HWEntry entry); +int hw_sched_wait_task(HWSched ctx); +int hw_sched_execute_task(HWSched ctx, void *appctx, HWEntry entry); + +int hw_sched_schedule_thread_task(HWSched ctx, void *appctx, HWEntry entry); +int hw_sched_wait_thread_task(HWSched ctx); +int hw_sched_execute_thread_task(HWSched ctx, void *appctx, HWEntry entry); + +HWMutex hw_sched_create_mutex(void); +void hw_sched_destroy_mutex(HWMutex ctx); + +#ifdef HW_USE_THREADS +# define hw_sched_lock(ctx, type) g_mutex_lock(ctx->type##_mutex) +# define hw_sched_unlock(ctx, type) g_mutex_unlock(ctx->type##_mutex) +# define hw_sched_broadcast(ctx, type) g_cond_broadcast(ctx->type##_cond) +# define hw_sched_signal(ctx, type) g_cond_signal(ctx->type##_cond) +# define hw_sched_wait(ctx, type) g_cond_wait(ctx->type##_cond, ctx->type##_cond_mutex) + +#define hw_sched_create_mutex(void) g_mutex_new() +#define hw_sched_destroy_mutex(ctx) g_mutex_free(ctx) +#define hw_sched_lock_mutex(ctx) g_mutex_lock(ctx) +#define hw_sched_unlock_mutex(ctx) g_mutex_unlock(ctx) +#else /* HW_USE_THREADS */ +# define hw_sched_lock(ctx, type) +# define hw_sched_unlock(ctx, type) +# define hw_sched_broadcast(ctx, type) +# define hw_sched_signal(ctx, type) +# define hw_sched_wait(ctx, type) + +#define hw_sched_create_mutex(void) NULL +#define hw_sched_destroy_mutex(ctx) +#define hw_sched_lock_mutex(ctx) +#define hw_sched_unlock_mutex(ctx) +#endif /* HW_USE_THREADS */ + +# ifdef __cplusplus +} +# endif + +#endif /* _HW_SCHED_H */ + diff --git a/src/hw_thread.c b/src/hw_thread.c new file mode 100644 index 0000000..0374630 --- /dev/null +++ b/src/hw_thread.c @@ -0,0 +1,143 @@ +/* + * The PyHST program is Copyright (C) 2002-2011 of the + * European Synchrotron Radiation Facility (ESRF) and + * Karlsruhe Institute of Technology (KIT). + * + * PyHST is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * hst is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "hw_config.h" + +#include "hw_sched.h" +#include "hw_thread.h" + +#ifdef HW_USE_THREADS +static void *hw_thread_function(HWThread ctx) { + int err; + int chunk_id; + + HWRunFunction *runs; + HWRunFunction run; + HWSched sched; + void *hwctx; + + sched = ctx->sched; + runs = ctx->run; + hwctx = ctx->hwctx; + + hw_sched_lock(sched, job_cond); + + hw_sched_lock(sched, compl_cond); + ctx->status = HW_THREAD_STATUS_IDLE; + hw_sched_broadcast(sched, compl); + hw_sched_unlock(sched, compl_cond); + + while (sched->status) { + hw_sched_wait(sched, job); + if (!sched->status) break; + + ctx->err = 0; + ctx->status = HW_THREAD_STATUS_STARTING; + hw_sched_unlock(sched, job_cond); + + run = hw_run_entry(runs, sched->entry); +#if 0 + // Offset to interleave transfers if the GPUBox is used + // Just check with CUDA_LAUNCH_BLOCKED the togpu time and put it here + // It should be still significantly less than BP time + // We can do callibration during initilization in future + + usleep(12000 * ctx->thread_id); +#endif + chunk_id = hw_sched_get_chunk(sched, ctx->thread_id); + + /* Should be after get_chunk, since we can check if it's first time */ + ctx->status = HW_THREAD_STATUS_RUNNING; + while (chunk_id != HW_SCHED_CHUNK_INVALID) { + //printf("Thread %i processing slice %i\n", ctx->thread_id, chunk_id); + err = run(ctx, hwctx, chunk_id, sched->ctx); + if (err) { + ctx->err = err; + break; + } + chunk_id = hw_sched_get_chunk(sched, ctx->thread_id); + } + + hw_sched_lock(sched, job_cond); + + hw_sched_lock(sched, compl_cond); + ctx->status = HW_THREAD_STATUS_DONE; + hw_sched_broadcast(sched, compl); + hw_sched_unlock(sched, compl_cond); + } + + hw_sched_unlock(sched, job_cond); + + g_thread_exit(NULL); + return NULL; /* TODO: check this */ +} +#endif /* HW_USE_THREADS */ + + +HWThread hw_thread_create(HWSched sched, int thread_id, void *hwctx, HWRunFunction *run_func, HWFreeFunction free_func) { + GError *err; + + HWThread ctx; + + ctx = (HWThread)malloc(sizeof(HWThreadS)); + if (!ctx) return ctx; + + memset(ctx, 0, sizeof(HWThreadS)); + + ctx->sched = sched; + ctx->hwctx = hwctx; + ctx->run = run_func; + ctx->free = free_func; + ctx->thread_id = thread_id; + ctx->status = HW_THREAD_STATUS_INIT; + +#ifdef HW_USE_THREADS + ctx->thread = g_thread_create((GThreadFunc)hw_thread_function, ctx, 1, &err); + if (!ctx->thread) { + g_error_free(err); + + hw_thread_destroy(ctx); + return NULL; + } +#endif /* HW_USE_THREADS */ + + return ctx; +} + +void hw_thread_destroy(HWThread ctx) { +#ifdef HW_USE_THREADS + if (ctx->thread) { + g_thread_join(ctx->thread); + } +#endif /* HW_USE_THREADS */ + + if (ctx->data) { + free(ctx->data); + } + + if (ctx->free) { + ctx->free(ctx->hwctx); + } + + free(ctx); +} diff --git a/src/hw_thread.h b/src/hw_thread.h new file mode 100644 index 0000000..de7f60f --- /dev/null +++ b/src/hw_thread.h @@ -0,0 +1,76 @@ +/* + * The PyHST program is Copyright (C) 2002-2011 of the + * European Synchrotron Radiation Facility (ESRF) and + * Karlsruhe Institute of Technology (KIT). + * + * PyHST is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * hst is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef _HW_THREAD_H +#define _HW_THREAD_H + +#include <glib.h> + +typedef struct HWThreadT *HWThread; +typedef int (*HWRunFunction)(HWThread thread, void *ctx, int block, void *attr); +typedef int (*HWFreeFunction)(void *ctx); + +#include "hw_sched.h" + +enum HWThreadStatusT { + HW_THREAD_STATUS_IDLE = 0, + HW_THREAD_STATUS_STARTING = 1, + HW_THREAD_STATUS_RUNNING = 2, + HW_THREAD_STATUS_FINISHING = 3, + HW_THREAD_STATUS_FINISHING2 = 4, + HW_THREAD_STATUS_DONE = 5, + HW_THREAD_STATUS_INIT = 6 +}; +typedef enum HWThreadStatusT HWThreadStatus; + + +#ifndef HW_HIDE_DETAILS +struct HWThreadT { + int thread_id; + HWSched sched; + +#ifdef HW_USE_THREADS + GThread *thread; +#endif /* HW_USE_THREADS */ + + void *hwctx; + HWRunFunction *run; + HWFreeFunction free; + + int err; + HWThreadStatus status; + + void *data; /**< Per-thread data storage, will be free'd if set */ +}; +typedef struct HWThreadT HWThreadS; +#endif /* HW_HIDE_DETAILS */ + +# ifdef __cplusplus +extern "C" { +# endif + +HWThread hw_thread_create(HWSched sched, int thread_id, void *hwctx, HWRunFunction *run_func, HWFreeFunction free_func); +void hw_thread_destroy(HWThread ctx); + +# ifdef __cplusplus +} +# endif + + +#endif /* _HW_THREAD_H */ diff --git a/src/meson.build b/src/meson.build index 324c744..fd41361 100644 --- a/src/meson.build +++ b/src/meson.build @@ -1,5 +1,4 @@ plugins = [ - 'roof-read', 'roof-build', 'roof-filter', 'roof-flat-field-correct' @@ -10,11 +9,11 @@ roof_common_src = [ ] roof_plugin_src = { - 'roof-read': [ + 'roof-build': [ + 'hw_sched.c', + 'hw_thread.c', 'ufo-roof-read-socket.c', 'ufo-roof-read-file.c', - ], - 'roof-build': [ 'ufo-roof-buffer.c', ], 'roof-filter': [ diff --git a/src/ufo-roof-buffer.c b/src/roof-buffer.c index 0e0a890..4ca0386 100644 --- a/src/ufo-roof-buffer.c +++ b/src/roof-buffer.c @@ -9,14 +9,15 @@ // This is currently not thread safe. With dual-filter architecture this will be called sequentially. -UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_datasets, GError **error) { +RoofBuffer *roof_buffer_new(RoofConfig *cfg, guint n_dims, guint max_datasets, GError **error) { if ((n_dims < 1)||(n_dims > 2)) roof_new_error(error, "Unsupported number of dimmensions %u (only plain and 2D ROOF structure is currently supported)", n_dims); - UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer)); - if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer"); + RoofBuffer *buffer = (RoofBuffer*)calloc(1, sizeof(RoofBuffer)); + if (!buffer) roof_new_error(error, "Can't allocate RoofBuffer"); buffer->max_datasets = max_datasets; + buffer->n_streams = cfg->n_streams; buffer->ring_size = cfg->buffer_size; buffer->drop_buffers = cfg->drop_buffers; buffer->latency_buffers = cfg->latency_buffers; @@ -36,26 +37,27 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_d buffer->n_fragments = (_Atomic guint*)calloc(buffer->ring_size, sizeof(_Atomic int)); buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint)); -#ifdef UFO_ROOF_INDEPENDENT_STREAMS - buffer->first_id = malloc(buffer->ring_size * sizeof(guint64)); +#ifdef ROOF_INDEPENDENT_STREAMS + buffer->first_id = malloc(buffer->n_streams * sizeof(guint64)); if (!buffer->first_id) roof_new_error(error, "Can't allocate first_id buffer for ROOF datasets"); - for (guint i = 0; i < buffer->ring_size; i++) + for (guint i = 0; i < buffer->n_streams; i++) buffer->first_id[i] = (guint64)-1; #else buffer->first_id = (guint64)-1; #endif + buffer->last_id = malloc(buffer->n_streams * sizeof(guint64)); if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) { - ufo_roof_buffer_free(buffer); + roof_buffer_free(buffer); roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size); } return buffer; } -void ufo_roof_buffer_free(UfoRoofBuffer *buffer) { +void roof_buffer_free(RoofBuffer *buffer) { if (buffer) { -#ifdef UFO_ROOF_INDEPENDENT_STREAMS +#ifdef ROOF_INDEPENDENT_STREAMS if (buffer->first_id) free(buffer->first_id); #endif @@ -71,7 +73,7 @@ void ufo_roof_buffer_free(UfoRoofBuffer *buffer) { } // fragment_id is numbered from 1 (0 - means auto) -gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) { +gboolean roof_buffer_set_fragment(RoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) { gboolean ready = FALSE; guint buffer_id; guint64 first_id; @@ -85,7 +87,7 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu dataset_id = (fragment_id - 1) / buffer->fragments_per_stream; fragment_id = (fragment_id - 1) % buffer->fragments_per_stream; -#ifdef UFO_ROOF_INDEPENDENT_STREAMS +#ifdef ROOF_INDEPENDENT_STREAMS if (buffer->first_id[stream_id] == (guint64)-1) buffer->first_id[stream_id] = dataset_id; first_id = buffer->first_id[stream_id]; @@ -102,7 +104,7 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu // FIXME: Currently, this produces too much output. Introduce some kind of debugging mode? if (dataset_id < buffer->current_id) { - roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id); +// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id); return FALSE; } @@ -110,24 +112,39 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu // printf("Stream %i: dataset %li < %li, first_id: %li\n", stream_id, dataset_id, buffer->max_datasets, first_id); return FALSE; } + + buffer->last_id[stream_id] = dataset_id; + // We are not fast enough, new packets are arrvining to fast if (dataset_id >= (buffer->current_id + buffer->ring_size)) { // FIXME: Broken packets sanity checks? Allocate additional buffers on demand? - root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed", - dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); + guint64 min_id = (guint64)-1; + guint64 max_id = 0; + for (guint i = 0; i < buffer->n_streams; i++) { + if (min_id > buffer->last_id[stream_id]) min_id = buffer->last_id[stream_id]; + if (max_id < buffer->last_id[stream_id]) max_id = buffer->last_id[stream_id]; + } + printf("Desync: %lu (Min: %lu, Max: %lu), stream: %u (of %u), dataset: %lu\n", max_id - min_id, min_id, max_id, stream_id, buffer->n_streams, dataset_id); + printf("Parts: %i %i %i %i\n", buffer->n_fragments[buffer_id], buffer->n_fragments[(buffer_id + 1)%buffer->ring_size], buffer->n_fragments[(buffer_id + 2)%buffer->ring_size], buffer->n_fragments[(buffer_id + 3)%buffer->ring_size]); + + root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed, already reported %lu", + dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset, buffer->n_ready); // FIXME: Send semi-complete buffers further? // FIXME: Or shall we drop more if larger buffers are allocated? if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) { memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint)); + // FIXME: Can finally received old buffer hit the counter here? Locking? buffer->current_id = dataset_id; } else { for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++) buffer->n_fragments[i%buffer->ring_size] = 0; + // FIXME: Can finally received old buffers hit the counters here? Locking? buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1; } + // FIXME: Can other threads obsolete the buffer meanwhile? This is not single threaded any more if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset) ready = TRUE; @@ -145,6 +162,7 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu ); */ + // FIXME: What if buffer obsolete meanwhile by other threads. We are not single threaded uint8_t *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size; if (buffer->n_dims == 2) { uint8_t *fragment_buffer = dataset_buffer + @@ -161,20 +179,25 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu } // FIXME: Sanity checks: verify is not a dublicate fragment? - atomic_fetch_add(&buffer->n_fragments[buffer_id], 1); + // This stuff is anyway is single-threaded which is a problem. + //atomic_fetch_add(&buffer->n_fragments[buffer_id], 1); + buffer->n_fragments[buffer_id]++; if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) { // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? if (dataset_id == buffer->current_id) ready = TRUE; - else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers))) - ready = ufo_roof_buffer_skip_to_ready(buffer); + else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers))) { + ready = roof_buffer_skip_to_ready(buffer); + } + + if (ready) buffer->n_ready++; } - + return ready; } -gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) { +gboolean roof_buffer_skip_to_ready(RoofBuffer *buffer) { for (guint i = 0; i < buffer->ring_size; i++) { guint64 id = buffer->current_id + i; guint buffer_id = id % buffer->ring_size; @@ -184,7 +207,7 @@ gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) { return TRUE; } -// printf("Skipping event %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); + printf("Skipping dataset %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); buffer->n_fragments[buffer_id] = 0; } @@ -192,7 +215,7 @@ gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) { } -gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) { +gboolean roof_buffer_get_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) { guint buffer_id = buffer->current_id % buffer->ring_size; void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size; @@ -207,3 +230,23 @@ gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buff return TRUE; } + +gboolean roof_buffer_wait_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, gulong timeout, GError **error) { + struct timespec start_time, current_time; + clock_gettime(CLOCK_REALTIME, &start_time); + + guint buffer_id = buffer->current_id % buffer->ring_size; + + // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? + while (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) { + // FIXME: get some sleep? How we can interrupt it? + // just small nanosleep? + if (timeout) { + clock_gettime(CLOCK_REALTIME, ¤t_time); + gulong wait = (current_time.tv_sec - start_time.tv_sec) * 1000000 + (current_time.tv_nsec - start_time.tv_nsec) / 1000; + if (wait > timeout) return FALSE; + } + } + + return roof_buffer_get_dataset(buffer, output_buffer, seqid, error); +} diff --git a/src/ufo-roof-buffer.h b/src/roof-buffer.h index 8e9c00b..a95dbaa 100644 --- a/src/ufo-roof-buffer.h +++ b/src/roof-buffer.h @@ -1,20 +1,24 @@ -#ifndef __UFO_ROOF_BUFFER_H -#define __UFO_ROOF_BUUFER_H +#ifndef __ROOF_BUFFER_H +#define __ROOF_BUFFER_H // This IS harmful! Just for testing -//#define UFO_ROOF_INDEPENDENT_STREAMS +#define ROOF_INDEPENDENT_STREAMS #include <stdatomic.h> +#include <stdint.h> +#include "ufo-roof-config.h" - -struct _UfoRoofBuffer { +struct _RoofBuffer { guint64 current_id; // The ID of the first (active) dataset in the buffer -#ifdef UFO_ROOF_INDEPENDENT_STREAMS +#ifdef ROOF_INDEPENDENT_STREAMS guint64 *first_id; // The ID of the first received dataset (used for numbering), -1 means not yet known #else guint64 first_id; // The ID of the first received dataset (used for numbering), -1 means not yet known #endif + guint64 *last_id; + guint64 n_ready; + guint n_streams; guint ring_size; // Number of datasets to buffer guint drop_buffers; // If we need to catch up guint latency_buffers; // we skip incomplete buffers if current_id + latency_buffers is ready @@ -35,13 +39,14 @@ struct _UfoRoofBuffer { guint fragments_per_stream; // Number of packets in each of data streams (used to compute when dataset is ready) }; -typedef struct _UfoRoofBuffer UfoRoofBuffer; +typedef struct _RoofBuffer RoofBuffer; -UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_datasets, GError **error); -void ufo_roof_buffer_free(UfoRoofBuffer *buf); +RoofBuffer *roof_buffer_new(RoofConfig *cfg, guint n_dims, guint max_datasets, GError **error); +void roof_buffer_free(RoofBuffer *buf); -gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error); -gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer); -gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error); +gboolean roof_buffer_set_fragment(RoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error); +gboolean roof_buffer_skip_to_ready(RoofBuffer *buffer); +gboolean roof_buffer_get_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error); +gboolean roof_buffer_wait_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, gulong timeout, GError **error); #endif diff --git a/src/ufo-roof-config.c b/src/roof-config.c index 944ee31..189c08f 100644 --- a/src/ufo-roof-config.c +++ b/src/roof-config.c @@ -34,14 +34,14 @@ typedef struct { - UfoRoofConfig cfg; + RoofConfig cfg; JsonParser *parser; -} UfoRoofConfigPrivate; +} RoofConfigPrivate; -void ufo_roof_config_free(UfoRoofConfig *cfg) { +void roof_config_free(RoofConfig *cfg) { if (cfg) { - UfoRoofConfigPrivate *priv = (UfoRoofConfigPrivate*)cfg; + RoofConfigPrivate *priv = (RoofConfigPrivate*)cfg; if (priv->parser) g_object_unref (priv->parser); @@ -50,9 +50,9 @@ void ufo_roof_config_free(UfoRoofConfig *cfg) { } } -UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, GError **error) { - UfoRoofConfigPrivate *priv; - UfoRoofConfig *cfg; +RoofConfig *roof_config_new(const char *config, RoofConfigFlags flags, GError **error) { + RoofConfigPrivate *priv; + RoofConfig *cfg; // JsonNode *node; JsonObject *root = NULL; @@ -67,10 +67,10 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, GError *gerr = NULL; - priv = (UfoRoofConfigPrivate*)malloc(sizeof(UfoRoofConfigPrivate)); - if (!priv) roof_new_error(error, "Can't allocate UfoRoofConfig"); + priv = (RoofConfigPrivate*)malloc(sizeof(RoofConfigPrivate)); + if (!priv) roof_new_error(error, "Can't allocate RoofConfig"); - memset(priv, 0, sizeof(UfoRoofConfigPrivate)); + memset(priv, 0, sizeof(RoofConfigPrivate)); // Set defaults cfg = &priv->cfg; @@ -87,8 +87,8 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, cfg->port = 52067; cfg->n_streams = 1; cfg->protocol = "udp"; - cfg->network_timeout = 10000000; - cfg->header_size = sizeof(UfoRoofPacketHeader); + cfg->network_timeout = 100000000; // FIXME: remove 0 + cfg->header_size = sizeof(RoofPacketHeader); cfg->payload_size = 0; cfg->max_packet_size = 0; cfg->max_packets = 100; @@ -96,6 +96,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, cfg->buffer_size = 2; cfg->latency_buffers = 0; cfg->drop_buffers = 0; + cfg->sockets_per_thread = 4; // Read configuration @@ -104,7 +105,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, if (gerr != NULL) { g_propagate_prefixed_error(error, gerr, "Error parsing JSON file (%s) with ROOF configuration: ", config); - ufo_roof_config_free(cfg); + roof_config_free(cfg); return NULL; } @@ -119,7 +120,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, roof_config_node_get(performance, root, object, "performance"); roof_config_node_get(data, root, object, "data"); - if (flags&UFO_ROOF_CONFIG_SIMULATION) + if (flags&ROOF_CONFIG_SIMULATION) roof_config_node_get(simulation, root, object, "simulation"); } @@ -135,12 +136,12 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, if ((cfg->sample_rate)||(cfg->imaging_rate)) { if ((!cfg->sample_rate)||(!cfg->imaging_rate)||(cfg->sample_rate%cfg->imaging_rate)) { - ufo_roof_config_free(cfg); + roof_config_free(cfg); roof_new_error(error, "Invalid sample (%u) and imaging (%u) rates are specified", cfg->sample_rate, cfg->imaging_rate); } if ((json_object_get_member(hardware, "samples_per_rotation"))&&(cfg->samples_per_rotation != (cfg->sample_rate / cfg->imaging_rate))) { - ufo_roof_config_free(cfg); + roof_config_free(cfg); roof_new_error(error, "The specified samples-per-rotation (%u) doesn't match sample/imaging rates (%u / %u)", cfg->samples_per_rotation, cfg->sample_rate, cfg->imaging_rate); } @@ -148,7 +149,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, } if ((cfg->bit_depth%8)||(cfg->bit_depth > 32)) { - ufo_roof_config_free(cfg); + roof_config_free(cfg); roof_new_error(error, "Invalid bit-depth (%u) is configured, only 8, 16, 24, 32 is currently supported", cfg->bit_depth); } @@ -170,13 +171,13 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, roof_config_node_get(cfg->dataset_size, network, int, "dataset_size"); if (!cfg->payload_size) { - ufo_roof_config_free(cfg); + roof_config_free(cfg); roof_new_error(error, "Packet payload and header size must be set"); } - if ((cfg->header_size < sizeof(UfoRoofPacketHeader))&&(!strncmp(cfg->protocol, "udp", 3))) { - ufo_roof_config_free(cfg); - roof_new_error(error, "The header with packet id (%lu bytes) is expected for un-ordered protocols", sizeof(UfoRoofPacketHeader)); + if ((cfg->header_size < sizeof(RoofPacketHeader))&&(!strncmp(cfg->protocol, "udp", 3))) { + roof_config_free(cfg); + roof_new_error(error, "The header with packet id (%lu bytes) is expected for un-ordered protocols", sizeof(RoofPacketHeader)); } if (!cfg->dataset_size) @@ -195,6 +196,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, roof_config_node_get(cfg->buffer_size, performance, int, "buffer_size"); roof_config_node_get(cfg->drop_buffers, performance, int, "drop_buffers"); roof_config_node_get(cfg->latency_buffers, performance, int, "latency_buffers"); + roof_config_node_get(cfg->sockets_per_thread, performance, int, "sockets_per_thread"); } @@ -204,13 +206,13 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, // Dataset should be split in an integer number of network packets (we don't expect data from different datasets in one packet at the moment) if ((cfg->dataset_size % cfg->payload_size)||(fragments_per_dataset%cfg->n_streams)) { - ufo_roof_config_free(cfg); + roof_config_free(cfg); roof_new_error(error, "Inconsistent ROOF configuration: dataset_size=%u, packet_size=%u, data_streams=%u", cfg->dataset_size, cfg->payload_size, cfg->n_streams); } // Packet should contain an integer number of complete projections (their parts provided by a single module) if ((cfg->roof_mode)&&(cfg->payload_size % (cfg->channels_per_module * (cfg->bit_depth / 8)))) { - ufo_roof_config_free(cfg); + roof_config_free(cfg); roof_new_error(error, "Inconsistent ROOF configuration: packet_size=%u, projection_size=%u (%u channels x %u bits)", cfg->payload_size, cfg->channels_per_module * (cfg->bit_depth / 8), cfg->channels_per_module, cfg->bit_depth); } @@ -219,12 +221,12 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, if (hardware) { if (cfg->n_modules != cfg->n_streams) { - ufo_roof_config_free(cfg); + roof_config_free(cfg); roof_new_error(error, "Currently, number of ROOF modules (%u) is exepcted to be equal to number of independent data streams (%u)", cfg->n_modules, cfg->n_streams); } if (cfg->dataset_size != (cfg->fan_projections * cfg->fan_bins * cfg->bit_depth / 8)) { - ufo_roof_config_free(cfg); + roof_config_free(cfg); roof_new_error(error, "Specified dataset size (%u) does not match ROOF configuration (modules: %u, channels-per-module: %u, bit-depth: %u, samples-per-rotation: %u)", cfg->dataset_size, cfg->n_modules, cfg->channels_per_module, cfg->bit_depth, cfg->samples_per_rotation); } } diff --git a/src/ufo-roof-config.h b/src/roof-config.h index 8718381..f28cb60 100644 --- a/src/ufo-roof-config.h +++ b/src/roof-config.h @@ -1,5 +1,5 @@ -#ifndef __UFO_ROOF_CONFIG_H -#define __UFO_ROOF_CONFIG_H +#ifndef __ROOF_CONFIG_H +#define __ROOF_CONFIG_H #include <glib.h> @@ -39,7 +39,7 @@ typedef struct { guint drop_buffers; // If we are slow and lost some buffers, we may drop more than minimally necessary to catch up. guint latency_buffers; // We skip incomplete buffers if later (at least latency_buffer in future) dataset is already ready, 0 - never skip guint network_timeout; // Maximum time (us) to wait for data on the socket - + guint sockets_per_thread; // Number of sockets per thread. Optimally number of therads should not exceed number of CPU cores @@ -50,15 +50,15 @@ typedef struct { -} UfoRoofConfig; +} RoofConfig; typedef enum { - UFO_ROOF_CONFIG_DEFAULT = 0, - UFO_ROOF_CONFIG_SIMULATION = 1 -} UfoRoofConfigFlags; + ROOF_CONFIG_DEFAULT = 0, + ROOF_CONFIG_SIMULATION = 1 +} RoofConfigFlags; -UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, GError **error); -void ufo_roof_config_free(UfoRoofConfig *cfg); +RoofConfig *roof_config_new(const char *config, RoofConfigFlags flags, GError **error); +void roof_config_free(RoofConfig *cfg); -#endif /* __UFO_ROOF_CONFIG_H */ +#endif /* __ROOF_CONFIG_H */ diff --git a/src/ufo-roof-error.h b/src/roof-error.h index 5491f31..418645c 100644 --- a/src/ufo-roof-error.h +++ b/src/roof-error.h @@ -1,5 +1,5 @@ -#ifndef __UFO_ROOF_ERROR_H -#define __UFO_ROOF_ERROR_H +#ifndef __ROOF_ERROR_H +#define __ROOF_ERROR_H #include <ufo/ufo.h> @@ -38,6 +38,9 @@ #define roof_setup_error(error, ...) \ roof_error(error, SETUP, __VA_ARGS__) +#define roof_setup_error_with_retval(error, ...) \ + roof_error_with_retval(error, retval, SETUP, __VA_ARGS__) + #define roof_new_error(error, ...) \ roof_error_with_retval(error, NULL, SETUP, __VA_ARGS__) @@ -55,4 +58,4 @@ roof_error(error, SETUP, __VA_ARGS__) -#endif /* __UFO_ROOF_ERROR_H */ +#endif /* __ROOF_ERROR_H */ diff --git a/src/ufo-roof-read-file.c b/src/roof-read-file.c index 4ee11c6..da8d51c 100644 --- a/src/ufo-roof-read-file.c +++ b/src/roof-read-file.c @@ -1,5 +1,6 @@ #include <stdio.h> #include <errno.h> +#include <assert.h> #include <stdint.h> #include "glib.h" @@ -8,64 +9,71 @@ #include "ufo-roof-read-file.h" typedef struct { - UfoRoofReadInterface iface; + RoofReadInterface iface; + + RoofConfig *cfg; - UfoRoofConfig *cfg; - gchar *fname; FILE *fd; -} UfoRoofReadFile; + uint8_t *buf; +} RoofReadFile; + +static void roof_read_file_free(RoofReadInterface *iface) { + RoofReadFile *reader = (RoofReadFile*)iface; -static void ufo_roof_read_file_free(UfoRoofReadInterface *iface) { - UfoRoofReadFile *reader = (UfoRoofReadFile*)iface; - if (reader) { if (reader->fname) g_free(reader->fname); if (reader->fd) fclose(reader->fd); - + + if (reader->buf) + free(reader->buf); free(reader); } } -static guint ufo_roof_read_file(UfoRoofReadInterface *iface, uint8_t *buffers, GError **error) { - UfoRoofReadFile *reader = (UfoRoofReadFile*)iface; - UfoRoofConfig *cfg = reader->cfg; +static guint roof_read_file(RoofReadInterface *iface, uint8_t **buffers, GError **error) { + RoofReadFile *reader = (RoofReadFile*)iface; + RoofConfig *cfg = reader->cfg; + + assert(iface); + assert(buffers); size_t bytes = 0; size_t packet_size = cfg->header_size + cfg->payload_size; size_t expected = cfg->max_packets * packet_size; while ((!feof(reader->fd))&&(!ferror(reader->fd))&&(bytes < expected)) { - size_t ret = fread(buffers + bytes, 1, expected - bytes, reader->fd); + size_t ret = fread(reader->buf + bytes, 1, expected - bytes, reader->fd); bytes += ret; } guint packets = bytes / packet_size; - + if (ferror(reader->fd)) { roof_network_error_with_retval(error, 0, "read failed, error %i", ferror(reader->fd)); } else if ((feof(reader->fd))&&(bytes % packet_size)) { roof_network_error_with_retval(error, packets, "extra data in the end of input"); } - + + *buffers = reader->buf; return packets; } -UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, const char *path, guint file_id, GError **error) { - UfoRoofReadFile *reader = (UfoRoofReadFile*)calloc(1, sizeof(UfoRoofReadFile)); - if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadFile"); +RoofReadInterface *roof_read_file_new(RoofConfig *cfg, const char *path, guint file_id, GError **error) { + RoofReadFile *reader = (RoofReadFile*)calloc(1, sizeof(RoofReadFile)); + if (!reader) roof_new_error(error, "Can't allocate RoofReadFile"); // FIXME: Shall we jump if max_packet_size > header+payload (or will be extra data included in the data files)? Report error for now. if ((cfg->header_size + cfg->payload_size) != cfg->max_packet_size) - roof_new_error(error, "packet_size (%u) should be equal to max_packet_size (%u) if UfoRoofReadFile is used", cfg->header_size + cfg->payload_size, cfg->max_packet_size); + roof_new_error(error, "packet_size (%u) should be equal to max_packet_size (%u) if RoofReadFile is used", cfg->header_size + cfg->payload_size, cfg->max_packet_size); reader->cfg = cfg; - reader->iface.close = ufo_roof_read_file_free; - reader->iface.read =ufo_roof_read_file; + reader->iface.close = roof_read_file_free; + reader->iface.read =roof_read_file; reader->fname = g_strdup_printf(path, file_id); if (!reader->fname) { @@ -80,5 +88,11 @@ UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, const char *pat roof_new_error(error, "Can't open file %i at path %s", file_id, path); } - return (UfoRoofReadInterface*)reader; + reader->buf = (uint8_t*)malloc(cfg->max_packets * (cfg->header_size + cfg->payload_size)); + if (!reader->buf) { + roof_read_file_free((RoofReadInterface*)reader); + roof_new_error(error, "Can't allocate file buffer"); + } + + return (RoofReadInterface*)reader; } diff --git a/src/roof-read-file.h b/src/roof-read-file.h new file mode 100644 index 0000000..60f60ba --- /dev/null +++ b/src/roof-read-file.h @@ -0,0 +1,8 @@ +#ifndef __ROOF_READ_FILE_H +#define __ROOF_READ_FILE_H + +#include "ufo-roof-read.h" + +RoofReadInterface *roof_read_file_new(RoofConfig *cfg, const char *path, guint file_id, GError **error); + +#endif diff --git a/src/ufo-roof-read-socket.c b/src/roof-read-socket.c index e8f7ce4..1b98b44 100644 --- a/src/ufo-roof-read-socket.c +++ b/src/roof-read-socket.c @@ -2,6 +2,7 @@ #include <stdio.h> #include <unistd.h> +#include <assert.h> #include <errno.h> #include <sys/types.h> #include <sys/socket.h> @@ -13,48 +14,51 @@ #include "ufo-roof-read-socket.h" typedef struct { - UfoRoofReadInterface iface; + RoofReadInterface iface; + + RoofConfig *cfg; - UfoRoofConfig *cfg; int socket; -} UfoRoofReadSocket; + struct mmsghdr *msg; + struct iovec *msgvec; + uint8_t *buf; + + int libvma; +} RoofReadSocket; + +static void roof_read_socket_free(RoofReadInterface *iface) { + RoofReadSocket *reader = (RoofReadSocket*)iface; -static void ufo_roof_read_socket_free(UfoRoofReadInterface *iface) { - UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface; - if (reader) { if (reader->socket >= 0) close(reader->socket); + if (reader->msgvec) + free(reader->msgvec); + if (reader->msg) + free(reader->msg); + if (reader->buf) + free(reader->buf); free(reader); } } -static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GError **error) { +static guint roof_read_socket(RoofReadInterface *iface, uint8_t **buf, GError **error) { int packets; struct timespec timeout_ts; - UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface; - UfoRoofConfig *cfg = reader->cfg; + assert(iface); + assert(buf); - struct mmsghdr msg[cfg->max_packets]; - struct iovec msgvec[cfg->max_packets]; + RoofReadSocket *reader = (RoofReadSocket*)iface; + RoofConfig *cfg = reader->cfg; timeout_ts.tv_sec = cfg->network_timeout / 1000000; timeout_ts.tv_nsec = 1000 * (cfg->network_timeout % 1000000); - // FIXME: Is it optimal? Auto-tune max_packets? Combine read & build? - memset(msg, 0, sizeof(msg)); - memset(msgvec, 0, sizeof(msgvec)); - for (guint i = 0; i < cfg->max_packets; i++) { - msgvec[i].iov_base = buf + i * cfg->max_packet_size; - msgvec[i].iov_len = cfg->max_packet_size; - msg[i].msg_hdr.msg_iov = &msgvec[i]; - msg[i].msg_hdr.msg_iovlen = 1; - } //retry: // Timeout seems broken, see BUGS in 'recvmmsg' bugs page - packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts); + packets = recvmmsg(reader->socket, reader->msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts); if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno); /* @@ -64,25 +68,25 @@ static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GEr } */ + *buf = reader->buf; return (guint)packets; } - -UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error) { +RoofReadInterface *roof_read_socket_new(RoofConfig *cfg, guint id, GError **error) { int err; int port = cfg->port + id; char port_str[16]; - const char *addr_str = "0.0.0.0"; + const char *addr_str = "192.168.100.8"; struct addrinfo sockaddr_hints; struct addrinfo *sockaddr_info; - UfoRoofReadSocket *reader = (UfoRoofReadSocket*)calloc(1, sizeof(UfoRoofReadSocket)); - if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadSocket"); + RoofReadSocket *reader = (RoofReadSocket*)calloc(1, sizeof(RoofReadSocket)); + if (!reader) roof_new_error(error, "Can't allocate RoofReadSocket"); reader->cfg = cfg; - reader->iface.close = ufo_roof_read_socket_free; - reader->iface.read =ufo_roof_read_socket; - + reader->iface.close = roof_read_socket_free; + reader->iface.read =roof_read_socket; + snprintf(port_str, sizeof(port_str), "%d", port); port_str[sizeof(port_str) / sizeof(port_str[0]) - 1] = '\0'; @@ -121,14 +125,35 @@ UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GEr } /* - // Send ping request to force initialization + // Check that FPGA module is ready char msg[4]; addr_str = "192.168.34.83"; getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info); sendto(reader->socket, msg, sizeof(msg), 0, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen); */ - freeaddrinfo(sockaddr_info); - return (UfoRoofReadInterface*)reader; + if (reader->libvma) { + } else { + reader->buf = (uint8_t*)malloc(cfg->max_packets * cfg->max_packet_size); + reader->msg = (struct mmsghdr*)malloc(cfg->max_packets * sizeof(struct mmsghdr)); + reader->msgvec = (struct iovec*)malloc(cfg->max_packets * sizeof(struct iovec)); + + if ((!reader->buf)||(!reader->msg)||(!reader->msgvec)) { + roof_read_socket_free((RoofReadInterface*)reader); + roof_new_error(error, "Can't allocate socket buffer"); + } + + memset(reader->msg, 0, cfg->max_packets * sizeof(struct mmsghdr)); + memset(reader->msgvec, 0, cfg->max_packets * sizeof(struct iovec)); + for (guint i = 0; i < cfg->max_packets; i++) { + reader->msgvec[i].iov_base = reader->buf + i * cfg->max_packet_size; + reader->msgvec[i].iov_len = cfg->max_packet_size; + reader->msg[i].msg_hdr.msg_iov = &reader->msgvec[i]; + reader->msg[i].msg_hdr.msg_iovlen = 1; + } + } + + + return (RoofReadInterface*)reader; } diff --git a/src/roof-read-socket.h b/src/roof-read-socket.h new file mode 100644 index 0000000..34a98e8 --- /dev/null +++ b/src/roof-read-socket.h @@ -0,0 +1,8 @@ +#ifndef __ROOF_READ_SOCKET_H +#define __ROOF_READ_SOCKET_H + +#include "ufo-roof-read.h" + +RoofReadInterface *roof_read_socket_new(RoofConfig *cfg, guint id, GError **error); + +#endif diff --git a/src/roof-read.c b/src/roof-read.c new file mode 100644 index 0000000..18a7508 --- /dev/null +++ b/src/roof-read.c @@ -0,0 +1,107 @@ +#include <stdio.h> + +#include "roof.h" +#include "roof-read.h" + +RoofReadContext *roof_read_context_new(RoofConfig *cfg, RoofReadInterface *rdi, GError **error) { + guint i; + GError *gerr = NULL; + + RoofReadContext *rdc = (RoofReadContext*)calloc(1, sizeof(RoofReadContext)); + if (!rdc) roof_new_error(error, "Can't allocate RoofReadContext"); + + rdc->cfg = cfg; + rdc->rdi = rdi; + + return rdc; +} + +void roof_read_context_free(RoofReadContext *rdc) { + if (rdc) { + free(rdc); + } +} + +void roof_read(RoofReadContext *rdc, RoofBufferInterface *bfi, GError *error) { + uint8_t *rdbuf = rdc->rdbuf; + guint n_packets = rdc->n_packets; + guint packet_id = rdc->packet_id; + guint fragment_id = rdc->fragment_id; + GError *gerr = NULL; + + ReadRoofInterface *rdi = rdc->rdi; + + if (rdc->rdbuf) { + rdbuf = rdc->rdbuf; + packets + else { + guint packets = rdi->read(rdi[stream_id], &rdbuf, &gerr); + if (gerr) roof_propagate_error(error, gerr, "roof_read: "); + packet_id = 0; + fragment_id = 0; + } + + packet = rdbuf + packet_id * (size + padding + + while (packet) { + + + while (rdbuf) { + // + + for (guint i = 0; i < packets; i++) { + guint64 packet_id = 0; + + // Otherwise considered consecutive and handled by the buffer + if (cfg->header_size >= sizeof(RoofPacketHeader)) { + RoofPacketHeader *pheader = ROOF_PACKET_HEADER(fragment); + packet_id = be64toh(pheader->packet_id) + 1; + } else { + // FIXME: consider consecutive + //fragment_id = ++buffer->stream_fragment[stream_id]; + } + + // FIXME: packet may contain fragments for multiple datasets + fragment_id = packet_id * fragments_per_packet; + guint64 dataset_id = (packet_id - 1) / cfg->fragments_per_stream; + guint64 fragment_id = (packet_id - 1) % cfg->fragments_per_stream; + + // FIXME: verify that packet is consecutive + // if + + // Drop packets of already skipped datasets + if (dataset_id < priv->current_id) + continue; + + // FIXME: stop processing and return.... + if (dataset_id < priv->current_id) + + uint8_t *fragment_buffer = dataset_buffer + + stream_id * fragment_dims[0] + // x-coordinate + (fragment_id * fragment_dims[1]) * dataset_dims[0]; // y-coordinate + + for (guint i = 0; i < buffer->fragment_dims[1]; ++i) { + memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]); + + +// buffer->last_id[stream_id] = dataset_id; + + ready |= roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr); + if (gerr) roof_print_error(gerr); + + fragment += cfg->max_packet_size; + + + } + + + } + + + + + void *buf = roof_buffer_get_dataset(bfi, ?); + + // Computing offsets each time, etc. + roof_buffer_set_fragment +}
\ No newline at end of file diff --git a/src/roof-read.h b/src/roof-read.h new file mode 100644 index 0000000..1ab846a --- /dev/null +++ b/src/roof-read.h @@ -0,0 +1,45 @@ +#ifndef __ROOF_READ_H +#define __ROOF_READ_H + +typedef struct _RoofRead RoofRead; + +#include "roof-config.h" + +G_BEGIN_DECLS + +typedef struct _RoofReadContext RoofReadContext; +typedef struct _RoofReadInterface RoofReadInterface; +typedef struct _RoofReadInterfaceSettings RoofReadInterfaceSettings; + +typedef guint (*RoofReaderRead)(RoofReadInterface *reader, uint8_t **buf, GError **error); +typedef void (*RoofReaderClose)(RoofReadInterface *reader); + +struct _RoofReadInterfaceSettings { + guint padding; // Packet size + padding +}; + +struct _RoofReadInterface { + RoofReaderRead read; + RoofReaderClose close; + + RoofReadInterfaceSettings settings; +}; + +struct _RoofReadContext { + RoofConfig *cfg; + RoofReadInterface *rdi; + + void *rdbuf; // The buffer we are currently processing + guint n_packets; // Number of packets in the buffer + guint packet_id; // Last processed packet in the buffer + guint fragment_id; // Last processed fragment in the packet +}; + +RoofReadContext *roof_read_context_new(RoofConfig *cfg, RoofReadInterface *rdi, GError **error); +void roof_read_context_free(RoofReadContext *ctx); +const RoofReadInterfaceSettings *roof_read_get_settings(UFORoofRead *ctx, GError **error); + + +G_END_DECLS + +#endif /* __ROOF_READ_H */ diff --git a/src/roof-thread.c b/src/roof-thread.c new file mode 100644 index 0000000..570300f --- /dev/null +++ b/src/roof-thread.c @@ -0,0 +1,95 @@ +#include <stdio.h> + +#include "roof.h" +#include "roof-thread.h" + + +RoofThreadContext *roof_thread_context_new(RoofConfig *cfg, Roof *roof, guint from, guint to, GError **error) { + GError *gerr = NULL; + + RoofThreadContext *rdt = (RoofThreadContext*)calloc(1, sizeof(RoofThreadContext)); + if (!rdt) roof_new_error(error, "Can't allocate RoofThreadContext"); + + rdt->cfg = cfg; + rdt->rdi = rdi; + + return rdt; + +} + +void roof_thread_context_free(RoofThreadContext *rdt) { + if (rdt) { + free(rdt); + } +} + + +int roof_thread_read_socket(Roof * + +int roof_thread_read(HWThread thr, void *hwctx, int id, void *data) { + Roof *ctx = (Roof*)data; + + RoofConfig *cfg = ctx->cfg; + RoofThreadContext *rdt = ctx->rdt[id]; + + guint dataset_dims[2] = { cfg->fan_bins * cfg->bit_depth / 8, cfg->fan_projections }; + guint fragment_dims[2] = { cfg->channels_per_module * cfg->bit_depth / 8, ????cfg->payload_size / buffer->fragment_dims[0] }; + + packet_id + fragment_id = + + for (guint stream_id = from; stream_id < to; stream_id++) { + + uint8_t *rdbuf; + guint packets = priv->rdi[stream_id]->read(priv->rdi[stream_id], &rdbuf, &gerr); + if (gerr) roof_print_error(gerr); + + for (guint i = 0; i < packets; i++) { + guint64 packet_id = 0; + + // Otherwise considered consecutive and handled by the buffer + if (cfg->header_size >= sizeof(RoofPacketHeader)) { + RoofPacketHeader *pheader = ROOF_PACKET_HEADER(fragment); + packet_id = be64toh(pheader->packet_id) + 1; + } else { + // FIXME: consider consecutive + //fragment_id = ++buffer->stream_fragment[stream_id]; + } + + // FIXME: packet may contain fragments for multiple datasets + fragment_id = packet_id * fragments_per_packet; + guint64 dataset_id = (packet_id - 1) / cfg->fragments_per_stream; + guint64 fragment_id = (packet_id - 1) % cfg->fragments_per_stream; + + // FIXME: verify that packet is consecutive + // if + + // Drop packets of already skipped datasets + if (dataset_id < priv->current_id) + continue; + + // FIXME: stop processing and return.... + if (dataset_id < priv->current_id) + + uint8_t *fragment_buffer = dataset_buffer + + stream_id * fragment_dims[0] + // x-coordinate + (fragment_id * fragment_dims[1]) * dataset_dims[0]; // y-coordinate + + for (guint i = 0; i < buffer->fragment_dims[1]; ++i) { + memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]); + + +// buffer->last_id[stream_id] = dataset_id; + + ready |= roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr); + if (gerr) roof_print_error(gerr); + + fragment += cfg->max_packet_size; + + + } + + } + +} + diff --git a/src/roof-thread.h b/src/roof-thread.h new file mode 100644 index 0000000..a180b29 --- /dev/null +++ b/src/roof-thread.h @@ -0,0 +1,30 @@ +#ifndef __ROOF_THREAD_H +#define __ROOF_THREAD_H + +typedef struct _RoofThreadContext RoofThreadContext; + +#include "ufo-roof-read.h" + +struct _RoofThreadContext { + RoofConfig *cfg; + RoofBuffer *buf; + RoofReadInterface *rdi; + guint from, to; // Determines ports/files which are read by this thread (from is inclusive and to - exclusive) +}; + +/* +RoofReadThread *guint roof_read_thread_new(RoofRead *rd, guint from, guint to, GError **error); +void roof_read_thread_free(UFORoofReadThread *thr, GError **error); +gboolean roof_read_thread_start(UFORoofReadThread *thr, GError **error); +gboolean roof_read_thread_stop(UFORoofReadThread *thr, GError **error); +*/ + +int roof_thread_read(HWThread thr, void *hwctx, int id, void *data); + +RoofThreadContext *roof_thread_context_new(RoofConfig *cfg, RoofReadContext *rdc, guint from, guint to, GError **error); +void roof_thread_context_free(RoofThreadContext *ctx); + + + +#endif /* __ROOF_THREAD_H */ + diff --git a/src/roof.c b/src/roof.c new file mode 100644 index 0000000..505e3aa --- /dev/null +++ b/src/roof.c @@ -0,0 +1,133 @@ +void roof_init() { + hw_sched_init(); +} + +Roof *roof_new(UfoRoofConfig *cfg, GError **error) { + guint i; + GError *gerr = NULL; + + Roof *ctx = (Roof*)calloc(1, sizeof(Roof)); + if (!ctx) roof_new_error(error, "Can't allocate Roof context"); + + ctx->cfg = cfg; + + ctx->n_threads = cfg->n_streams / cfg->sockets_per_thread; + if (cfg->n_streams % cfg->sockets_per_thread) ctx->n_threads++; + + ctx->rdi = (RoofReadInterface**)calloc(cfg->n_streams, sizeof(RoofReadInterface*)); + ctx->rdc = (RoofReadContext**)calloc(cfg->n_streams, sizeof(RoofReadContext*)); + ctx->rdt = (RoofThreadContext**)calloc(ctx->n_threads, sizeof(RoofThreadContext*)); + ctx->sched = hw_sched_create(cfg->n_threads); + + if ((!ctx->rdi)||(!ctx->rdc)||(!ctx->rdt)||(!ctx->sched)) { + roof_free(ctx); + roof_setup_error(error, "Failed to allocate memory for various Roof contexts"); + } + + return ctx; +} + +void roof_configure_simulation(Roof *ctx, const gchar *path, guint first_file_number, GError **error) { + assert(ctx); + + ctx->simulate = 1; + ctx->path = path; + ctx->first_file_number = first_file_number; +} + +void roof_configure_stop_mode(Roof *ctx, const gulong max, GError **error) { + assert(ctx); + + ctx->max_datasets = max; +} + + +void roof_setup(Roof *ctx, GError **error) { + guint i; + GError *gerr = NULL; + + assert(ctx); + + RoofConfig *cfg = ctx->cfg; + +/* + ctx->buf = roof_buffer_new(cfg, 2, ctx->max_datasets, &gerr); + if ((gerr)||(!ctx->buf)) + roof_propagate_error(error, gerr, "roof_buffer_new: "); +*/ + + for (i = 0; i < cfg->n_streams; i++) { + if (ctx->simulate) { + if (!ctx->path) + roof_setup_error(error, "Path to simulated data should be specified"); + + ctx->rdi[i] = roof_read_file_new(cfg, ctx->path, ctx->first_file_number + i, &gerr); + } else { + ctx->rdi[i] = roof_read_socket_new(cfg, i, &gerr); + } + + if (!ctx->rdi[i]) + roof_propagate_error(error, gerr, "roof_read_interface_new: "); + + ctx->rdc[i] = roof_read_context_new(cfg, ctx->rdi[i], &gerr); + if (!ctx->rdc[i]) + roof_propagate_error(error, gerr, "roof_read_context_new: "); + } + + // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit + guint extra = 0, sockets_per_thread = cfg->n_streams / ctx->n_threads; + if (cfg->n_streams % ctx->n_threads) extra = cfg->n_streams - ctx->n_threads * sockets_per_thread; + + guint from, to; + for (i = 0; i < ctx->n_threads; i++) { + guint to = from + sockets_per_thread; + if (i < extra) to++; + + ctx->thr[i]= roof_thread_new(cfg, ctx, from, to, &gerr); + if (!ctx->thr[i]) roof_propagate_error(error, gerr, "roof_thread_new (%i): ", i); + } +} + + +void roof_free(Roof *ctx) { + guint i; + + if (ctx) { + RoofConfig *cfg = ctx->cfg; + if (ctx->sched) hw_sched_destroy(priv->sched); + + if (ctx->rdt) { + for (i = 0; i < ctx->n_threads; i++) + if (ctx->rdt[i]) roof_thread_context_free(ctx->rdt[i]); + free(ctx->rdt); + } + + if (ctx->rdc) { + for (i = 0; i < cfg->n_streams; i++) + if (ctx->rdc[i]) roof_read_context_free(ctx->rdc[i]); + free(ctx->rdc); + } + + if (ctx->rdi) { + for (i = 0; i < cfg->n_streams; i++) + if (ctx->rdi[i]) roof_read_interface_free(ctx->rdi[i]); + free(ctx->rdi); + } + + if (ctx->buf) roof_buffer_free(ctx->buf); + free(ctx); + } +} + + +void roof_read_dataset(Roof *ctx, void *buffer, GError **error) { + priv->current_dataset; + priv->current_buffer = buffer; + + err = hw_sched_schedule_thread_task(sched, (void*)ctx, roof_thread_read); + if (!err) err = hw_sched_wait_task(sched); + if (err) { fprintf(stderr, "Error %i scheduling init threads", err); exit(-1); } +} + + +} diff --git a/src/roof.h b/src/roof.h new file mode 100644 index 0000000..316e8ed --- /dev/null +++ b/src/roof.h @@ -0,0 +1,46 @@ +#ifndef __ROOF_H +#define __ROOF_H + +typedef struct _Roof Roof; + +#include "roof-config.h" +#include "roof-buffer.h" +#include "roof-read.h" +#include "roof-thread.h" + +struct _Roof { + RoofConfig *cfg; // Parsed ROOF parameters + RoofBuffer *buf; // Ring buffer for incomming UDP packet + RoofReadInterface **rdi; // Reader interface abstraction, one per socket (no threading) + RoofReadContext **rdc; // Reader context: common structures, one per socket (no threading) + RoofThread **rdt; // Threading context: multiple reader contexts per thread + + guint n_threads; // Number of schedulled threads + HWSched sched; // OpenMP-style thread scheduler + + gboolean simulate; // Indicates if we are running in network or simulation modes + gchar *path; // UFO file path for simulation mode + guint first_file_number; // Number of a first simulated file (0 or 1) + + guint max_datasets; // Number of datasets to read + +// guint64 announced; // For debugging +// guint64 generated; // Total number for control + +// struct timespec last_fragment_timestamp; + +}; + + +Roof *roof_new(RoofConfig *cfg, GError **error); +void roof_free(Roof *ctx); + +void roof_configure_simulation(Roof *ctx, const gchar *path, guint first_file_number, GError **error); +void roof_configure_stop_mode(Roof *ctx, const gulong max, GError **error); +//void roof_configure_writer(Roof *ctx, ...); +//void roof_configure_filter(Roof *ctx, ...); +void roof_setup(Roof *ctx, GError **error); + +void roof_read(Roof *ctx, void *buffer, GError **error); + +#endif
\ No newline at end of file diff --git a/src/save/memcpy.c b/src/save/memcpy.c new file mode 100644 index 0000000..5c29d01 --- /dev/null +++ b/src/save/memcpy.c @@ -0,0 +1,344 @@ +/********************************************************************
+ ** File: memcpy.c
+ **
+ ** Copyright (C) 1999-2010 Daniel Vik
+ **
+ ** This software is provided 'as-is', without any express or implied
+ ** warranty. In no event will the authors be held liable for any
+ ** damages arising from the use of this software.
+ ** Permission is granted to anyone to use this software for any
+ ** purpose, including commercial applications, and to alter it and
+ ** redistribute it freely, subject to the following restrictions:
+ **
+ ** 1. The origin of this software must not be misrepresented; you
+ ** must not claim that you wrote the original software. If you
+ ** use this software in a product, an acknowledgment in the
+ ** use this software in a product, an acknowledgment in the
+ ** product documentation would be appreciated but is not
+ ** required.
+ **
+ ** 2. Altered source versions must be plainly marked as such, and
+ ** must not be misrepresented as being the original software.
+ **
+ ** 3. This notice may not be removed or altered from any source
+ ** distribution.
+ **
+ **
+ ** Description: Implementation of the standard library function memcpy.
+ ** This implementation of memcpy() is ANSI-C89 compatible.
+ **
+ ** The following configuration options can be set:
+ **
+ ** LITTLE_ENDIAN - Uses processor with little endian
+ ** addressing. Default is big endian.
+ **
+ ** PRE_INC_PTRS - Use pre increment of pointers.
+ ** Default is post increment of
+ ** pointers.
+ **
+ ** INDEXED_COPY - Copying data using array indexing.
+ ** Using this option, disables the
+ ** PRE_INC_PTRS option.
+ **
+ ** MEMCPY_64BIT - Compiles memcpy for 64 bit
+ ** architectures
+ **
+ **
+ ** Best Settings:
+ **
+ ** Intel x86: LITTLE_ENDIAN and INDEXED_COPY
+ **
+ *******************************************************************/
+
+
+
+/********************************************************************
+ ** Configuration definitions.
+ *******************************************************************/
+
+#define LITTLE_ENDIAN
+#define INDEXED_COPY
+
+
+/********************************************************************
+ ** Includes for size_t definition
+ *******************************************************************/
+
+#include <stddef.h>
+
+
+/********************************************************************
+ ** Typedefs
+ *******************************************************************/
+
+typedef unsigned char UInt8;
+typedef unsigned short UInt16;
+typedef unsigned int UInt32;
+#ifdef _WIN32
+typedef unsigned __int64 UInt64;
+#else
+typedef unsigned long long UInt64;
+#endif
+
+#ifdef MEMCPY_64BIT
+typedef UInt64 UIntN;
+#define TYPE_WIDTH 8L
+#else
+typedef UInt32 UIntN;
+#define TYPE_WIDTH 4L
+#endif
+
+
+/********************************************************************
+ ** Remove definitions when INDEXED_COPY is defined.
+ *******************************************************************/
+
+#if defined (INDEXED_COPY)
+#if defined (PRE_INC_PTRS)
+#undef PRE_INC_PTRS
+#endif /*PRE_INC_PTRS*/
+#endif /*INDEXED_COPY*/
+
+
+
+/********************************************************************
+ ** Definitions for pre and post increment of pointers.
+ *******************************************************************/
+
+#if defined (PRE_INC_PTRS)
+
+#define START_VAL(x) (x)--
+#define INC_VAL(x) *++(x)
+#define CAST_TO_U8(p, o) ((UInt8*)p + o + TYPE_WIDTH)
+#define WHILE_DEST_BREAK (TYPE_WIDTH - 1)
+#define PRE_LOOP_ADJUST - (TYPE_WIDTH - 1)
+#define PRE_SWITCH_ADJUST + 1
+
+#else /*PRE_INC_PTRS*/
+
+#define START_VAL(x)
+#define INC_VAL(x) *(x)++
+#define CAST_TO_U8(p, o) ((UInt8*)p + o)
+#define WHILE_DEST_BREAK 0
+#define PRE_LOOP_ADJUST
+#define PRE_SWITCH_ADJUST
+
+#endif /*PRE_INC_PTRS*/
+
+
+
+/********************************************************************
+ ** Definitions for endians
+ *******************************************************************/
+
+#if defined (LITTLE_ENDIAN)
+
+#define SHL >>
+#define SHR <<
+
+#else /* LITTLE_ENDIAN */
+
+#define SHL <<
+#define SHR >>
+
+#endif /* LITTLE_ENDIAN */
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses incremening pointers.
+ *******************************************************************/
+
+#define CP_INCR() { \
+ INC_VAL(dstN) = INC_VAL(srcN); \
+}
+
+#define CP_INCR_SH(shl, shr) { \
+ dstWord = srcWord SHL shl; \
+ srcWord = INC_VAL(srcN); \
+ dstWord |= srcWord SHR shr; \
+ INC_VAL(dstN) = dstWord; \
+}
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses array indexes.
+ *******************************************************************/
+
+#define CP_INDEX(idx) { \
+ dstN[idx] = srcN[idx]; \
+}
+
+#define CP_INDEX_SH(x, shl, shr) { \
+ dstWord = srcWord SHL shl; \
+ srcWord = srcN[x]; \
+ dstWord |= srcWord SHR shr; \
+ dstN[x] = dstWord; \
+}
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses incremening pointers or array indexes depending on
+ ** configuration.
+ *******************************************************************/
+
+#if defined (INDEXED_COPY)
+
+#define CP(idx) CP_INDEX(idx)
+#define CP_SH(idx, shl, shr) CP_INDEX_SH(idx, shl, shr)
+
+#define INC_INDEX(p, o) ((p) += (o))
+
+#else /* INDEXED_COPY */
+
+#define CP(idx) CP_INCR()
+#define CP_SH(idx, shl, shr) CP_INCR_SH(shl, shr)
+
+#define INC_INDEX(p, o)
+
+#endif /* INDEXED_COPY */
+
+
+#define COPY_REMAINING(count) { \
+ START_VAL(dst8); \
+ START_VAL(src8); \
+ \
+ switch (count) { \
+ case 7: INC_VAL(dst8) = INC_VAL(src8); \
+ case 6: INC_VAL(dst8) = INC_VAL(src8); \
+ case 5: INC_VAL(dst8) = INC_VAL(src8); \
+ case 4: INC_VAL(dst8) = INC_VAL(src8); \
+ case 3: INC_VAL(dst8) = INC_VAL(src8); \
+ case 2: INC_VAL(dst8) = INC_VAL(src8); \
+ case 1: INC_VAL(dst8) = INC_VAL(src8); \
+ case 0: \
+ default: break; \
+ } \
+}
+
+#define COPY_NO_SHIFT() { \
+ UIntN* dstN = (UIntN*)(dst8 PRE_LOOP_ADJUST); \
+ UIntN* srcN = (UIntN*)(src8 PRE_LOOP_ADJUST); \
+ size_t length = count / TYPE_WIDTH; \
+ \
+ while (length & 7) { \
+ CP_INCR(); \
+ length--; \
+ } \
+ \
+ length /= 8; \
+ \
+ while (length--) { \
+ CP(0); \
+ CP(1); \
+ CP(2); \
+ CP(3); \
+ CP(4); \
+ CP(5); \
+ CP(6); \
+ CP(7); \
+ \
+ INC_INDEX(dstN, 8); \
+ INC_INDEX(srcN, 8); \
+ } \
+ \
+ src8 = CAST_TO_U8(srcN, 0); \
+ dst8 = CAST_TO_U8(dstN, 0); \
+ \
+ COPY_REMAINING(count & (TYPE_WIDTH - 1)); \
+ \
+ return dest; \
+}
+
+
+
+#define COPY_SHIFT(shift) { \
+ UIntN* dstN = (UIntN*)((((UIntN)dst8) PRE_LOOP_ADJUST) & \
+ ~(TYPE_WIDTH - 1)); \
+ UIntN* srcN = (UIntN*)((((UIntN)src8) PRE_LOOP_ADJUST) & \
+ ~(TYPE_WIDTH - 1)); \
+ size_t length = count / TYPE_WIDTH; \
+ UIntN srcWord = INC_VAL(srcN); \
+ UIntN dstWord; \
+ \
+ while (length & 7) { \
+ CP_INCR_SH(8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ length--; \
+ } \
+ \
+ length /= 8; \
+ \
+ while (length--) { \
+ CP_SH(0, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(1, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(2, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(3, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(4, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(5, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(6, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(7, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ \
+ INC_INDEX(dstN, 8); \
+ INC_INDEX(srcN, 8); \
+ } \
+ \
+ src8 = CAST_TO_U8(srcN, (shift - TYPE_WIDTH)); \
+ dst8 = CAST_TO_U8(dstN, 0); \
+ \
+ COPY_REMAINING(count & (TYPE_WIDTH - 1)); \
+ \
+ return dest; \
+}
+
+
+/********************************************************************
+ **
+ ** void *memcpy(void *dest, const void *src, size_t count)
+ **
+ ** Args: dest - pointer to destination buffer
+ ** src - pointer to source buffer
+ ** count - number of bytes to copy
+ **
+ ** Return: A pointer to destination buffer
+ **
+ ** Purpose: Copies count bytes from src to dest.
+ ** No overlap check is performed.
+ **
+ *******************************************************************/
+
+void *fast_memcpy(void *dest, const void *src, size_t count)
+{
+ UInt8* dst8 = (UInt8*)dest;
+ UInt8* src8 = (UInt8*)src;
+
+ if (count < 8) {
+ COPY_REMAINING(count);
+ return dest;
+ }
+
+ START_VAL(dst8);
+ START_VAL(src8);
+
+ while (((UIntN)dst8 & (TYPE_WIDTH - 1)) != WHILE_DEST_BREAK) {
+ INC_VAL(dst8) = INC_VAL(src8);
+ count--;
+ }
+
+ switch ((((UIntN)src8) PRE_SWITCH_ADJUST) & (TYPE_WIDTH - 1)) {
+ case 0: COPY_NO_SHIFT(); break;
+ case 1: COPY_SHIFT(1); break;
+ case 2: COPY_SHIFT(2); break;
+ case 3: COPY_SHIFT(3); break;
+#if TYPE_WIDTH > 4
+ case 4: COPY_SHIFT(4); break;
+ case 5: COPY_SHIFT(5); break;
+ case 6: COPY_SHIFT(6); break;
+ case 7: COPY_SHIFT(7); break;
+#endif
+ }
+}
diff --git a/src/save/memcpy.h b/src/save/memcpy.h new file mode 100644 index 0000000..0714823 --- /dev/null +++ b/src/save/memcpy.h @@ -0,0 +1,63 @@ +/********************************************************************
+ ** File: memcpy.h
+ **
+ ** Copyright (C) 2005 Daniel Vik
+ **
+ ** This software is provided 'as-is', without any express or implied
+ ** warranty. In no event will the authors be held liable for any
+ ** damages arising from the use of this software.
+ ** Permission is granted to anyone to use this software for any
+ ** purpose, including commercial applications, and to alter it and
+ ** redistribute it freely, subject to the following restrictions:
+ **
+ ** 1. The origin of this software must not be misrepresented; you
+ ** must not claim that you wrote the original software. If you
+ ** use this software in a product, an acknowledgment in the
+ ** use this software in a product, an acknowledgment in the
+ ** product documentation would be appreciated but is not
+ ** required.
+ **
+ ** 2. Altered source versions must be plainly marked as such, and
+ ** must not be misrepresented as being the original software.
+ **
+ ** 3. This notice may not be removed or altered from any source
+ ** distribution.
+ **
+ **
+ ** Description: Implementation of the standard library function memcpy.
+ ** This implementation of memcpy() is ANSI-C89 compatible.
+ **
+ *******************************************************************/
+
+
+/********************************************************************
+ ** Includes for size_t definition
+ *******************************************************************/
+
+#include <stddef.h>
+
+
+/********************************************************************
+ **
+ ** void *memcpy(void *dest, const void *src, size_t count)
+ **
+ ** Args: dest - pointer to destination buffer
+ ** src - pointer to source buffer
+ ** count - number of bytes to copy
+ **
+ ** Return: A pointer to destination buffer
+ **
+ ** Purpose: Copies count bytes from src to dest. No overlap check
+ ** is performed.
+ **
+ *******************************************************************/
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void *fast_memcpy(void *dest, const void *src, size_t count);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/src/save/ufo-roof-buffer-build-task.c b/src/save/ufo-roof-buffer-build-task.c new file mode 100644 index 0000000..bdb7a7d --- /dev/null +++ b/src/save/ufo-roof-buffer-build-task.c @@ -0,0 +1,474 @@ +/* + * Copyright (C) 2011-2015 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <stdio.h> +#include <endian.h> +#include <threads.h> + +#ifdef __APPLE__ +#include <OpenCL/cl.h> +#else +#include <CL/cl.h> +#endif + +#include "ufo-roof.h" +#include "ufo-roof-buffer.h" +#include "ufo-roof-build-task.h" + +typedef enum { + BUILD_AUTO = 0, + BUILD_RAW, + BUILD_SINO, + BUILD_UFO +} BuildType; + +struct _UfoRoofBuildTaskPrivate { + gchar *config; // ROOF configuration file name + UfoRoofConfig *cfg; // Parsed ROOF parameters + UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet + UfoRoofReadInterface; *rdi; // Reader interfaces, one per socket (no threading) + UfoRoofRead *rd; // Threading interface + + gchar *path; // UFO file path for simulation mode + guint first_file_number; // Number of a first simulated file (0 or 1) + + BuildType build; // What dataset do we build: ROOF sinogram or raw network data + guint number; // Number of datasets to read + gboolean stop; // Stop flag + gboolean simulate; // Indicates if we are running in network or simulation modes + + guint64 announced; // For debugging + guint64 generated; // Total number for control + + struct timespec last_fragment_timestamp; +}; + +static void ufo_task_interface_init (UfoTaskIface *iface); + +G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE, + G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK, + ufo_task_interface_init)) + +#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate)) + + + +static GEnumValue build_values[] = { + { BUILD_AUTO, "BUILD_AUTO", "auto" }, + { BUILD_RAW, "BUILD_RAW", "raw" }, + { BUILD_SINO, "BUILD_SINO", "sino" }, + { BUILD_UFO, "BUILD_UFO", "ufo" }, + { 0, NULL, NULL} +}; + +enum { + PROP_0, + PROP_STOP, + PROP_SIMULATE, + PROP_PATH, + PROP_FIRST, + PROP_NUMBER, + PROP_BUILD, + PROP_CONFIG, + N_PROPERTIES +}; + +static GParamSpec *properties[N_PROPERTIES] = { NULL, }; + +UfoNode * +ufo_roof_build_task_new (void) +{ + return UFO_NODE (g_object_new (UFO_TYPE_ROOF_BUILD_TASK, NULL)); +} + +static void +ufo_roof_build_task_setup (UfoTask *task, + UfoResources *resources, + GError **error) +{ + guint i; + GError *gerr = NULL; + + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + + if (!priv->config) + roof_setup_error(error, "ROOF configuration is not specified"); + + priv->cfg = ufo_roof_config_new(priv->config, priv->simulate?UFO_ROOF_CONFIG_SIMULATION:UFO_ROOF_CONFIG_DEFAULT, &gerr); + if (!priv->cfg) + roof_propagate_error(error, gerr, "roof-build-setup: "); + + for (i = 0; i < priv->cfg->n_streams; i++) { + if (priv->simulate) { + if (!priv->path) + roof_setup_error(error, "Path to simulated data should be specified"); + + priv->rdi[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id * priv->cfg->sockets_per_thread + i + priv->first_file_number, &gerr); + } else + priv->rdi[i] = ufo_roof_read_socket_new(priv->cfg, priv->id * priv->cfg->sockets_per_thread + i, &gerr); + + if (!priv->rdi[i]) + roof_propagate_error(error, gerr, "roof_read_interface_new: "); + } + + if (priv->build == BUILD_AUTO) { + if (priv->cfg->roof_mode) priv->build = BUILD_SINO; + else priv->build = BUILD_RAW; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_BUILD]); + } + + priv->buf = ufo_roof_buffer_new(priv->cfg, (priv->build == BUILD_RAW)?1:2, priv->number, &gerr); + if ((gerr)||(!priv->buf)) + roof_propagate_error(error, gerr, "roof_buffer_new: "); + + priv->rd = ufo_roof_read_new(priv->cfg, priv->rdi, priv->buf, &gerr); + if (gerr) + roof_propagate_error(error, gerr, "roof_read_new: "); + + clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); +} + +static void +ufo_roof_build_task_finalize (GObject *object) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + if (priv->rd) { + ufo_roof_read_free(priv->rd); + priv->rd = NULL; + } + + if (priv->buf) { + ufo_roof_buffer_free(priv->buf); + priv->buf = NULL; + } + + if (priv->cfg) { + ufo_roof_config_free(priv->cfg); + priv->cfg = NULL; + } + + if (priv->config) { + g_free(priv->config); + priv->config = NULL; + } + + if (priv->path) { + g_free(priv->path); + priv->path = NULL; + } + + G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object); +} + + + +static void +ufo_roof_build_task_get_requisition (UfoTask *task, + UfoBuffer **inputs, + UfoRequisition *requisition, + GError **error) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + + // FIXME: Can we handle data types more elegant? + if (priv->build == BUILD_RAW) { + guint bytes = priv->cfg->dataset_size; + requisition->n_dims = 1; + requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0); + } else if (priv->build == BUILD_SINO) { + guint bytes = priv->cfg->fan_bins * priv->cfg->bit_depth / 8; + requisition->n_dims = 2; + requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0); + requisition->dims[1] = priv->cfg->fan_projections; + } else if (priv->build == BUILD_UFO) { + requisition->n_dims = 2; + requisition->dims[0] = priv->cfg->fan_bins; + requisition->dims[1] = priv->cfg->fan_projections; + } +} + +static guint +ufo_roof_build_task_get_num_inputs (UfoTask *task) +{ + return 1; +} + +static guint +ufo_roof_build_task_get_num_dimensions (UfoTask *task, + guint input) +{ + return 1; +} + +static UfoTaskMode +ufo_roof_build_task_get_mode (UfoTask *task) +{ + return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR; +} + + +static gboolean +ufo_roof_build_task_generate (UfoTask *task, + UfoBuffer *output, + UfoRequisition *requisition) +{ + gboolean ready = FALSE; + gulong seqid; + GError *gerr = NULL; + GValue ival = G_VALUE_INIT; + GValue lval = G_VALUE_INIT; + + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + UfoRoofConfig *cfg = priv->cfg; + UfoRoofBuffer *buf = priv->buf; + + void *output_buffer = ufo_buffer_get_host_array(output, NULL); + + if (priv->stop) + return FALSE; + + // FIXME: Wait or break. Not both. + do { + ready = ufo_roof_buffer_wait_dataset(buf, output_buffer, &seqid, cfg->network_timeout, &gerr); + if (gerr) roof_print_error(gerr); + + if (!ready) { + ready = ufo_roof_buffer_skip_to_ready(buf); + if (!ready) { + priv->stop = TRUE; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); + return FALSE; + } + } + } while (!ready); + + // FIXME: integrate fastwriter somewhere here? + + if (priv->build == BUILD_UFO) { + switch (cfg->bit_depth) { + case 8: + ufo_buffer_convert(output, UFO_BUFFER_DEPTH_8U); + break; + case 16: + ufo_buffer_convert(output, UFO_BUFFER_DEPTH_16U); + break; + case 32: + ufo_buffer_convert(output, UFO_BUFFER_DEPTH_32U); + break; + default: + printf("Usupported bit-depth %u\n", cfg->bit_depth); + } + } + + // Metadata: plane and sequential number within the plane + g_value_init (&ival, G_TYPE_UINT); + g_value_init (&lval, G_TYPE_ULONG); + if (priv->build != BUILD_UFO) { + g_value_set_uint (&ival, cfg->bit_depth); + ufo_buffer_set_metadata (output, "bpp", &ival); + } + g_value_set_uint (&ival, 1 + seqid % cfg->n_planes); + ufo_buffer_set_metadata (output, "plane", &ival); + g_value_set_ulong (&lval, seqid / cfg->n_planes); + ufo_buffer_set_metadata (output, "plane_id", &lval); + g_value_set_ulong (&lval, seqid); + ufo_buffer_set_metadata (output, "seqid", &lval); + g_value_unset(&lval); + g_value_unset(&ival); + + // FIXME: Or shall we start from counting from the ID of the first registerd dataset + if ((priv->number)&&(buf->current_id >= priv->number)) { +// printf("%lu datasets processed, stopping\n", buf->current_id); + priv->stop = TRUE; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); + } + + if (ready) priv->generated++; + + if (((priv->number > 0)&&(priv->number <= 100))||((buf->current_id - priv->announced) > 1000)) { + if (ready) + printf("Processing dataset %li (ready ), next : %u out of %u\n", buf->current_id, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + else + printf("Skipping dataset %li (timeout), acquired: %u out of %u\n", buf->current_id + 1, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + priv->announced = buf->current_id; + } + + return ready; +} + +static void +ufo_roof_build_task_set_property (GObject *object, + guint property_id, + const GValue *value, + GParamSpec *pspec) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_CONFIG: + if (priv->config) g_free(priv->config); + priv->config = g_value_dup_string(value); + break; + case PROP_STOP: + priv->stop = g_value_get_boolean (value); + break; + case PROP_SIMULATE: + priv->simulate = g_value_get_boolean (value); + break; + case PROP_PATH: + if (priv->path) g_free(priv->path); + priv->path = g_value_dup_string(value); + break; + case PROP_FIRST: + priv->first_file_number = g_value_get_uint (value); + break; + case PROP_NUMBER: + priv->number = g_value_get_uint (value); + break; + case PROP_BUILD: + priv->build = g_value_get_enum (value); + if ((priv->build == BUILD_AUTO)&&(priv->cfg)) { + if (priv->cfg->roof_mode) priv->build = BUILD_SINO; + else priv->build = BUILD_RAW; + } + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_roof_build_task_get_property (GObject *object, + guint property_id, + GValue *value, + GParamSpec *pspec) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_CONFIG: + g_value_set_string(value, priv->config); + break; + case PROP_STOP: + g_value_set_boolean (value, priv->stop); + break; + case PROP_SIMULATE: + g_value_set_boolean (value, priv->simulate); + break; + case PROP_PATH: + g_value_set_string(value, priv->path?priv->path:""); + break; + case PROP_FIRST: + g_value_set_uint (value, priv->first_file_number); + break; + case PROP_NUMBER: + g_value_set_uint (value, priv->number); + break; + case PROP_BUILD: + g_value_set_enum (value, priv->build); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_task_interface_init (UfoTaskIface *iface) +{ + iface->setup = ufo_roof_build_task_setup; + iface->get_num_inputs = ufo_roof_build_task_get_num_inputs; + iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions; + iface->get_mode = ufo_roof_build_task_get_mode; + iface->get_requisition = ufo_roof_build_task_get_requisition; + iface->generate = ufo_roof_build_task_generate; +} + +static void +ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) +{ + GObjectClass *oclass = G_OBJECT_CLASS (klass); + + oclass->set_property = ufo_roof_build_task_set_property; + oclass->get_property = ufo_roof_build_task_get_property; + oclass->finalize = ufo_roof_build_task_finalize; + + properties[PROP_CONFIG] = + g_param_spec_string ("config", + "ROOF configuration", + "Path to ROOF configuration file", + "", + G_PARAM_READWRITE); + + properties[PROP_STOP] = + g_param_spec_boolean ("stop", + "Stop flag", + "Stop socket servers and terminates filter execution", + FALSE, + G_PARAM_READWRITE); + + properties[PROP_SIMULATE] = + g_param_spec_boolean ("simulate", + "Simulation mode", + "Read data from the specified files instead of network", + FALSE, + G_PARAM_READWRITE); + + properties[PROP_PATH] = + g_param_spec_string ("path", + "Input files for simulation mode", + "Optional path to input files for simulation mode (parameter from configuration file is used if not specified)", + "", + G_PARAM_READWRITE); + + properties[PROP_FIRST] = + g_param_spec_uint ("first_file_number", + "Offset to the first read file", + "Offset to the first read file", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE); + + properties[PROP_NUMBER] = + g_param_spec_uint("number", + "Number of datasets to receive", + "Number of datasets to receive", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE); + + properties[PROP_BUILD] = + g_param_spec_enum ("build", + "Build type (\"raw\", \"sino\", \"ufo\")", + "Build type (\"raw\" - raw data, \"sino\" - arrange in sinogram, \"ufo\" - arrange in sinogram and convert UFO floating-point format)", + g_enum_register_static ("build", build_values), + 0, G_PARAM_READWRITE); + + + for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++) + g_object_class_install_property (oclass, i, properties[i]); + + g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate)); +} + +static void +ufo_roof_build_task_init(UfoRoofBuildTask *self) +{ + self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self); +} diff --git a/src/save/ufo-roof-read-thread.c b/src/save/ufo-roof-read-thread.c new file mode 100644 index 0000000..60868d2 --- /dev/null +++ b/src/save/ufo-roof-read-thread.c @@ -0,0 +1,155 @@ +#include <stdio.h> +#include <threads.h> + +#include <ufo/ufo.h> + +#include "ufo-roof-buffer.h" +#include "ufo-roof-read-thread.h" +#include "ufo-roof-read.h" + + +UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error) { + int i; + GError *gerr = NULL; + + UfoRoofReadThread *thr = (UfoRoofReadThread*)calloc(1, sizeof(UfoRoofReadThread)); + if (!ctx) roof_new_error(error, "Can't allocate UfoRoofReadThread context"); + + thr->rdbuf = malloc(cfg->max_packets * cfg->max_packet_size); + if (!thr->rdbuf) { + ufo_roof_read_thread_free(thr); + roof_new_error(error, "Can't allocate memory for temporary packet receiver buffer"); + } + + thr->rd = rd; + thr->from = from; + thr->to = to; + + + return thr; + +} + +void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error) { + if (!thr) return; + if (thr->rdbuf) free(thr->rdbuf); + + ufo_roof_thread_stop(thr, error); + free(thr); +} + +static int ufo_roof_read_thread_run(void *arg) { + GError *gerr = NULL; + + UfoRoofReadThread *thr = (UfoRoofReadThread*)arg; + + UfoRoofConfig *cfg = thr->rd->cfg; + UfoRoofBuffer *buf = thr->rd->buf; + UfoRoofReadInterface *rdi = thr->rd->rdi; + + guint from = thr->from; + guint to = thr->to; + + void *rdbuf = thr->rdbuf; + + uint64_t current_id[to - from] = {0}; + uint64_t grabbed[to - from] = {0}; + + static uint64_t errors = 0; + + while (thr->op != UFO_ROOF_OP_STOP) { + for (guint sid = from; sid < to; sid++) { + // FIXME break waiting on stop? If no packets are send + guint packets = rdi[sid]->read(priv->reader[sid], rdbuf, &gerr); + if (gerr) roof_print_error(gerr); + + guint ready = false; + const uint8_t *fragment = (uint8_t*)rdbuf; + for (guint i = 0; i < packets; i++) { + guint64 packet_id = 0; + + // Otherwise considered consecutive and handled by the buffer + if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { + UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); + packet_id = be64toh(pheader->packet_id) + 1; + } + +#ifdef UFO_ROOF_DEBUG + if ((current_id[sid - from])&&(current_id[sid - from] + 1 != packet_id)) { + printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets); + //if (++errors > 1000) exit(1); + } + + current_id[sid - from] = packet_id; + grabbed[sid - from]++; + if ((grabbed[sid - from]%1000000)==0) + printf("Channel %i: Grabbed %lu Mpackets\n", sid, grabbed[sid - from]/1000000); +#endif + + ready |= ufo_roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr); + if (gerr) roof_print_error(gerr); + + fragment += cfg->max_packet_size; + } // fragment-loop + + // send notification? Broadcast blocks, we don't want it. + if (ready) { + } + + } // socket-loop + } // operation-loop + + +#ifdef UFO_ROOF_DEBUG + // Store first received packet on each channel... + static int debug = 1; + if (debug) { + char fname[256]; + sprintf(fname, "channel%i_packet0.raw", priv->id); + FILE *f = fopen(fname, "w"); + if (f) { + fwrite(output_buffer, 1, cfg->max_packets * cfg->max_packet_size, f); + fclose(f); + } + debug = 0; + } +#endif /* UFO_ROOF_DEBUG */ + + // FIXME: End of data (shall we restart in the network case?) +// if (!packets) +// return FALSE; + + // Shall I use UFO metadata (ufo_buffer_set_metadata) insead? + header->channel_id = priv->id; +// header->n_packets = packets; + + return TRUE; +} + + +} + +gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error) { + int err; + if (!thr) return FALSE; + + err = thrd_create(&thr->thread, ufo_roof_read_thread_run, thr); + if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) spawning new read thread", err); + + ctx->launched = TRUE; + return TRUE; +} + +gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error) { + int err, ret; + if (!thr) return FALSE; + if (!thr->launched) return TRUE; + + // Signal thread termination + + err = thrd_join(&thr->thread, &ret); + if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) waiting for read thread termination", err); + + return TRUE; +} + diff --git a/src/save/ufo-roof-read-thread.h b/src/save/ufo-roof-read-thread.h new file mode 100644 index 0000000..ebe8989 --- /dev/null +++ b/src/save/ufo-roof-read-thread.h @@ -0,0 +1,23 @@ +#ifndef __UFO_ROOF_READ_THREAD_H +#define __UFO_ROOF_READ_THREAD_H + +typedef struct _UfoRoofReadThread UfoRoofReadThread; + +#include "ufo-roof-read.h" + +struct _UfoRoofReadThread { + UfoRoofRead *rd; // ROOF Reader Cotext + guint from, to; // Determines ports/files which are read by this thread (from is inclusive and to - exclusive) + + gboolean launched; // Flag indicating if thread is launched + thrd_t thread; // Thread ID +}; + +/* +UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error); +void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error); +gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error); +gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error); +*/ + +#endif /* __UFO_ROOF_READ_THREAD_H */
\ No newline at end of file diff --git a/src/save/ufo-roof-read.c b/src/save/ufo-roof-read.c new file mode 100644 index 0000000..f3d790d --- /dev/null +++ b/src/save/ufo-roof-read.c @@ -0,0 +1,123 @@ +#include <stdio.h> +#include <assert.h> + +#include <ufo/ufo.h> + +#include "ufo-roof-buffer.h" +#include "ufo-roof-read-thread.h" +#include "ufo-roof-read.h" + + + +#include <errno.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +static void ufo_roof_read_optimize(UfoRoofConfig *cfg) { + // FIXME: request real-time permissions? + // FIXME: do we need this? +/* + uint32_t lat = 0; + int fd = open("/dev/cpu_dma_latency", O_RDWR); + if (fd == -1) { + fprintf(stderr, "Failed to open cpu_dma_latency: error %s", strerror(errno)); + } else { + write(fd, &lat, sizeof(lat)); + close(fd); + } +*/ +} + + +const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error) { + assert(ctx); + return ctx->settings; +} + + +UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error) { + guint i; + GError *gerr = NULL; + + UfoRoofRead *ctx = (UfoRoofRead*)calloc(1, sizeof(UfoRoofRead)); + if (!ctx) roof_new_error(error, "Can't allocate UfoRoofRead context"); + + ufo_roof_read_optimize(ctx); + + ctx->n_threads = cfg->n_read_threads; + ctx->cfg = cfg; + ctx->buf = buf; + ctx->rdi = rdi; + + ctx->thr = (UfoRoofReadThread*)calloc(cfg->n_read_threads, sizeof(UfoRoofReadThread)); + if (!ctx->thr) { + ufo_roof_read_free(ctx); + roof_new_error(error, "Error allocating UfoRoofReadThread contexts"); + } + + // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit + guint n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread; + if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) n_threads++; + ctx->n_threads = n_threads; + + guint extra = 0, sockets_per_thread = priv->cfg->n_streams / n_threads; + if (priv->cfg->n_streams % n_threads) extra = priv->cfg->n_streams - n_threads * sockets_per_thread; + + guint from, to; + for (i = 0, from = 0; i < n_threads; i++, from = to) { + guint to = from + sockets_per_thread; + if (i < extra) to++; + + ctx->thr[i]= ufo_roof_thread_new(ctx, from, to, &gerr); + if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i); + } + + return ctx; +} + +void ufo_roof_read_free(UfoRoofRead *ctx) { + if (!ctx) return; + + if (ctx->thr) { + int i; + ufo_roof_read_stop(ctx); + for (i = 0; i < ctx->n_threads; i++) { + if (ctx->thr[i]) + ufo_roof_read_thread_free(ctx->thr[i]); + } + free(ctx->thr); + } + free(ctx); +} + +gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error) { + gboolean ret; + GError *gerr; + + if ((!ctx)||(!ctx->thr)) return FALSE; + + for (int i = 0; i < ctx->n_threads; i++) { + if (!ctx->thr[i]) return FALSE; + ret = ufo_roof_read_thread_start(ctx, &gerr); + if (!ret) roof_propagate_error_with_retval(error, FALSE, gerr, "ufo_roof_read_thread_start (%i): ", i); + } + return TRUE; +} + +gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error) { + gboolean ret, res = FALSE; + GError *gerr; + + if ((!ctx)||(!ctx->thr)) return FALSE; + + for (int i = 0; i < ctx->n_threads; i++) { + if (!ctx->thr[i]) return FALSE; + ret = ufo_roof_read_thread_stop(ctx, &gerr); + if (!ret) g_propagate_perfixed_error(error, gerr, "ufo_roof_read_thread_stop (%i): ", i); + res |= !ret; + } + return !res; +} + + diff --git a/src/save/ufo-roof-read.h b/src/save/ufo-roof-read.h new file mode 100644 index 0000000..16e910b --- /dev/null +++ b/src/save/ufo-roof-read.h @@ -0,0 +1,61 @@ +#ifndef __UFO_ROOF_READ_H +#define __UFO_ROOF_READ_H + +typedef struct _UfoRoofRead UfoRoofRead; + +#include "ufo-roof-config.h" +#include "ufo-roof-buffer.h" +//#include "ufo-roof-read-thread.h" + +G_BEGIN_DECLS + +typedef struct _UfoRoofReadInterface UfoRoofReadInterface; +typedef struct _UfoRoofReadInterfaceSettings UfoRoofReadInterfaceSettings; + +//typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t *buf, GError **error); + +typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t **buf, GError **error); +typedef void (*UfoRoofReaderClose)(UfoRoofReadInterface *reader); + +struct _UfoRoofReadInterfaceSettings { + guint padding; // Packet size + padding +}; + +struct _UfoRoofReadInterface { + UfoRoofReaderRead read; + UfoRoofReaderClose close; + + UfoRoofReadInterfaceSettings settings; +}; + +typedef enum { + UFO_ROOF_OP_STOP = 0, + UFO_ROOF_OP_READ +} UfoRoofOp; + +struct _UfoRoofReadContext { + UfoRoofConfig *cfg; + UfoRoofBuffer *buf; + UfoRoofReadInterface *rdi; + + gulong packet +// UfoRoofReadThread *thr; + + guint n_threads; + UfoRoofOp op; // Current operation (reading by default) +}; + + + +/* +UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error); +void ufo_roof_read_free(UfoRoofRead *ctx); +gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error); +gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error); + +const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error); +*/ + +G_END_DECLS + +#endif /* __UFO_ROOF_READ_H */ diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c index a39ed11..ee1cec5 100644 --- a/src/ufo-roof-build-task.c +++ b/src/ufo-roof-build-task.c @@ -19,6 +19,7 @@ #include <stdio.h> #include <endian.h> +#include <threads.h> #ifdef __APPLE__ #include <OpenCL/cl.h> @@ -26,10 +27,16 @@ #include <CL/cl.h> #endif +#include "hw_sched.h" + #include "ufo-roof.h" +#include "ufo-roof-read.h" #include "ufo-roof-buffer.h" +#include "ufo-roof-read-socket.h" +#include "ufo-roof-read-file.h" #include "ufo-roof-build-task.h" + typedef enum { BUILD_AUTO = 0, BUILD_RAW, @@ -37,10 +44,16 @@ typedef enum { BUILD_UFO } BuildType; -struct _UfoRoofBuildTaskPrivate { +struct _RoofBuildTaskPrivate { gchar *config; // ROOF configuration file name - UfoRoofConfig *cfg; // Parsed ROOF parameters - UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet + RoofConfig *cfg; // Parsed ROOF parameters +// RoofBuffer *buf; // Ring buffer for incomming UDP packet + RoofReadInterface **rdi; // Reader interfaces, one per socket (no threading) +// RoofRead *rd; // Threading interface + HWSched sched; + + gchar *path; // UFO file path for simulation mode + guint first_file_number; // Number of a first simulated file (0 or 1) BuildType build; // What dataset do we build: ROOF sinogram or raw network data guint number; // Number of datasets to read @@ -48,17 +61,21 @@ struct _UfoRoofBuildTaskPrivate { gboolean simulate; // Indicates if we are running in network or simulation modes guint64 announced; // For debugging + guint64 generated; // Total number for control + + guint n_threads; // Number of schedulled threads + struct timespec last_fragment_timestamp; }; static void ufo_task_interface_init (UfoTaskIface *iface); -G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE, +G_DEFINE_TYPE_WITH_CODE (RoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE, G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK, ufo_task_interface_init)) -#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate)) +#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, RoofBuildTaskPrivate)) @@ -74,6 +91,8 @@ enum { PROP_0, PROP_STOP, PROP_SIMULATE, + PROP_PATH, + PROP_FIRST, PROP_NUMBER, PROP_BUILD, PROP_CONFIG, @@ -93,9 +112,10 @@ ufo_roof_build_task_setup (UfoTask *task, UfoResources *resources, GError **error) { + guint i; GError *gerr = NULL; - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); if (!priv->config) roof_setup_error(error, "ROOF configuration is not specified"); @@ -104,15 +124,65 @@ ufo_roof_build_task_setup (UfoTask *task, if (!priv->cfg) roof_propagate_error(error, gerr, "roof-build-setup: "); + priv->rdi = (RoofReadInterface**)calloc(priv->cfg->n_streams, sizeof(RoofReadInterface*)); + if (!priv->rdi) + roof_setup_error(error, "Failed to allocate memory for RoofReadInterface array"); + + + for (i = 0; i < priv->cfg->n_streams; i++) { + if (priv->simulate) { + if (!priv->path) + roof_setup_error(error, "Path to simulated data should be specified"); + + priv->rdi[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->first_file_number + i, &gerr); + } else + priv->rdi[i] = ufo_roof_read_socket_new(priv->cfg, i, &gerr); + + if (!priv->rdi[i]) + roof_propagate_error(error, gerr, "roof_read_interface_new: "); + + priv->rdc[i] = ufo_roof_read_context_new(priv->cfg, priv->rdi[i], &gerr); + if (!priv->rdc[i]) + roof_propagate_error(error, gerr, "roof_read_context_new: "); + } + + + // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit + priv->n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread; + if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) priv->n_threads++; + + guint extra = 0, sockets_per_thread = priv->cfg->n_streams / priv->n_threads; + if (priv->cfg->n_streams % priv->n_threads) extra = priv->cfg->n_streams - priv->n_threads * sockets_per_thread; + + guint from, to; + for (i = 0; i < priv->n_threads; i++) { + guint to = from + sockets_per_thread; + if (i < extra) to++; + + ctx->thr[i]= ufo_roof_thread_new(priv->cfg, priv->rdc, from, to, &gerr); + if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i); + } + if (priv->build == BUILD_AUTO) { if (priv->cfg->roof_mode) priv->build = BUILD_SINO; else priv->build = BUILD_RAW; g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_BUILD]); } +/* priv->buf = ufo_roof_buffer_new(priv->cfg, (priv->build == BUILD_RAW)?1:2, priv->number, &gerr); - if (!priv->buf) - roof_propagate_error(error, gerr, "roof-build-setup: "); + if ((gerr)||(!priv->buf)) + roof_propagate_error(error, gerr, "roof_buffer_new: "); + + priv->rd = ufo_roof_read_new(priv->cfg, priv->rdi, priv->buf, &gerr); + if (gerr) + roof_propagate_error(error, gerr, "roof_read_new: "); +*/ + + priv->sched = hw_sched_create(priv->cfg->n_read_threads); + if (!priv->sched) + roof_setup_error(error, "Failed to schedule builder threads"); + clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); } @@ -120,12 +190,34 @@ ufo_roof_build_task_setup (UfoTask *task, static void ufo_roof_build_task_finalize (GObject *object) { - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); - + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + if (priv->sched) { + hw_sched_destroy(priv->sched); + priv->sched = NULL; + } + +/* + if (priv->rd) { + ufo_roof_read_free(priv->rd); + priv->rd = NULL; + } + if (priv->buf) { ufo_roof_buffer_free(priv->buf); priv->buf = NULL; } +*/ + + if (priv->rdi) { + guint i; + for (i = 0; i < priv->cfg->n_streams; i++) { + if (priv->rdi[i]) + priv->rdi[i]->close(priv->rdi[i]); + } + free(priv->rdi); + priv->rdi = NULL; + } if (priv->cfg) { ufo_roof_config_free(priv->cfg); @@ -137,6 +229,10 @@ ufo_roof_build_task_finalize (GObject *object) priv->config = NULL; } + if (priv->path) { + g_free(priv->path); + priv->path = NULL; + } G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object); } @@ -149,9 +245,10 @@ ufo_roof_build_task_get_requisition (UfoTask *task, UfoRequisition *requisition, GError **error) { - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); // FIXME: Can we handle data types more elegant? + // FIXME: Kill BUILD_RAW ? if (priv->build == BUILD_RAW) { guint bytes = priv->cfg->dataset_size; requisition->n_dims = 1; @@ -184,100 +281,56 @@ ufo_roof_build_task_get_num_dimensions (UfoTask *task, static UfoTaskMode ufo_roof_build_task_get_mode (UfoTask *task) { - return UFO_TASK_MODE_CPU | UFO_TASK_MODE_REDUCTOR; + return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR; } + static gboolean -ufo_roof_build_task_process (UfoTask *task, - UfoBuffer **inputs, +ufo_roof_build_task_generate (UfoTask *task, UfoBuffer *output, UfoRequisition *requisition) { - GError *gerr = NULL; - - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); - UfoRoofConfig *cfg = priv->cfg; - UfoRoofBuffer *buf = priv->buf; gboolean ready = FALSE; + gulong seqid; + GError *gerr = NULL; + GValue ival = G_VALUE_INIT; + GValue lval = G_VALUE_INIT; -// UfoRequisition in_req; -// ufo_buffer_get_requisition (inputs[0], &in_req); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + RoofConfig *cfg = priv->cfg; + RoofBuffer *buf = priv->buf; - const uint8_t *data = (uint8_t*)ufo_buffer_get_host_array(inputs[0], NULL); - UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(data, cfg); + void *output_buffer = ufo_buffer_get_host_array(output, NULL); if (priv->stop) return FALSE; - const uint8_t *fragment = data; - for (guint i = 0; i < header->n_packets; i++) { - guint64 packet_id = 0; - // Otherwise considered consecutive and handled by the buffer - if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { - UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); - packet_id = be64toh(pheader->packet_id) + 1; - } + priv->current_dataset; + priv->current_buffer = output_buffer; - // FIXME: Can we kill here the dataset finished during the previous step of iteration - ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, fragment + cfg->header_size, &gerr); - if (gerr) roof_print_error(gerr); + err = hw_sched_schedule_thread_task(sched, (void*)&tnv_ctx, ufo_roof_build_task_read); + if (!err) err = hw_sched_wait_task(sched); + if (err) { fprintf(stderr, "Error %i scheduling init threads", err); exit(-1); } - fragment += cfg->max_packet_size; - } - - // FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one? - - if (ready) { - clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); - } else { - struct timespec current_time; - clock_gettime(CLOCK_REALTIME, ¤t_time); - - // No new accepted events within timeout - if (((current_time.tv_sec - priv->last_fragment_timestamp.tv_sec) * 1000000 + (current_time.tv_nsec - priv->last_fragment_timestamp.tv_nsec) / 1000) > cfg->network_timeout) { - ready = ufo_roof_buffer_skip_to_ready(buf); - if (ready) { - // FIXME: shall we really reset timer here? - clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); - } else { +/* + // FIXME: Wait or break. Not both. + do { + ready = ufo_roof_buffer_wait_dataset(buf, output_buffer, &seqid, cfg->network_timeout, &gerr); + if (gerr) roof_print_error(gerr); + + if (!ready) { + ready = ufo_roof_buffer_skip_to_ready(buf); + if (!ready) { priv->stop = TRUE; g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); - } + return FALSE; + } } - } - - -/* - printf("proc (%s - %u of %u) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", buf->n_fragments[(buf->current_id)%buf->ring_size], buf->fragments_per_dataset, header->channel_id, header->n_packets, - (cfg->header_size >= sizeof(UfoRoofPacketHeader))?UFO_ROOF_PACKET_HEADER(data)->packet_id / (cfg->dataset_size / cfg->payload_size / cfg->n_streams):0); + } while (!ready); */ - return !ready; -} - -static gboolean -ufo_roof_build_task_generate (UfoTask *task, - UfoBuffer *output, - UfoRequisition *requisition) -{ - gboolean ready = FALSE; - gulong seqid; - GError *gerr = NULL; - GValue ival = G_VALUE_INIT; - GValue lval = G_VALUE_INIT; - - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); - UfoRoofConfig *cfg = priv->cfg; - UfoRoofBuffer *buf = priv->buf; - - void *output_buffer = ufo_buffer_get_host_array(output, NULL); - - if (priv->stop) - return FALSE; - - ready = ufo_roof_buffer_get_dataset(buf, output_buffer, &seqid, &gerr); - if (gerr) roof_print_error(gerr); + // FIXME: integrate fastwriter somewhere here? if (priv->build == BUILD_UFO) { switch (cfg->bit_depth) { @@ -318,8 +371,13 @@ ufo_roof_build_task_generate (UfoTask *task, g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); } + if (ready) priv->generated++; + if (((priv->number > 0)&&(priv->number <= 100))||((buf->current_id - priv->announced) > 1000)) { - printf("Processing dataset %li (%s), next: %u out of %u\n", buf->current_id + (ready?0:1), (ready?"ready ":"timeout "), buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + if (ready) + printf("Processing dataset %li (ready ), next : %u out of %u\n", buf->current_id, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + else + printf("Skipping dataset %li (timeout), acquired: %u out of %u\n", buf->current_id + 1, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); priv->announced = buf->current_id; } @@ -332,7 +390,7 @@ ufo_roof_build_task_set_property (GObject *object, const GValue *value, GParamSpec *pspec) { - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); switch (property_id) { case PROP_CONFIG: @@ -345,6 +403,13 @@ ufo_roof_build_task_set_property (GObject *object, case PROP_SIMULATE: priv->simulate = g_value_get_boolean (value); break; + case PROP_PATH: + if (priv->path) g_free(priv->path); + priv->path = g_value_dup_string(value); + break; + case PROP_FIRST: + priv->first_file_number = g_value_get_uint (value); + break; case PROP_NUMBER: priv->number = g_value_get_uint (value); break; @@ -367,11 +432,11 @@ ufo_roof_build_task_get_property (GObject *object, GValue *value, GParamSpec *pspec) { - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); switch (property_id) { case PROP_CONFIG: - g_value_set_string(value, priv->config); + g_value_set_string(value, priv->config); break; case PROP_STOP: g_value_set_boolean (value, priv->stop); @@ -379,6 +444,12 @@ ufo_roof_build_task_get_property (GObject *object, case PROP_SIMULATE: g_value_set_boolean (value, priv->simulate); break; + case PROP_PATH: + g_value_set_string(value, priv->path?priv->path:""); + break; + case PROP_FIRST: + g_value_set_uint (value, priv->first_file_number); + break; case PROP_NUMBER: g_value_set_uint (value, priv->number); break; @@ -394,17 +465,18 @@ ufo_roof_build_task_get_property (GObject *object, static void ufo_task_interface_init (UfoTaskIface *iface) { + roof_init(); + iface->setup = ufo_roof_build_task_setup; iface->get_num_inputs = ufo_roof_build_task_get_num_inputs; iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions; iface->get_mode = ufo_roof_build_task_get_mode; iface->get_requisition = ufo_roof_build_task_get_requisition; - iface->process = ufo_roof_build_task_process; iface->generate = ufo_roof_build_task_generate; } static void -ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) +ufo_roof_build_task_class_init (RoofBuildTaskClass *klass) { GObjectClass *oclass = G_OBJECT_CLASS (klass); @@ -426,7 +498,6 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) FALSE, G_PARAM_READWRITE); - properties[PROP_SIMULATE] = g_param_spec_boolean ("simulate", "Simulation mode", @@ -434,6 +505,20 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) FALSE, G_PARAM_READWRITE); + properties[PROP_PATH] = + g_param_spec_string ("path", + "Input files for simulation mode", + "Optional path to input files for simulation mode (parameter from configuration file is used if not specified)", + "", + G_PARAM_READWRITE); + + properties[PROP_FIRST] = + g_param_spec_uint ("first_file_number", + "Offset to the first read file", + "Offset to the first read file", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE); + properties[PROP_NUMBER] = g_param_spec_uint("number", "Number of datasets to receive", @@ -452,11 +537,11 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++) g_object_class_install_property (oclass, i, properties[i]); - g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate)); + g_type_class_add_private (oclass, sizeof(RoofBuildTaskPrivate)); } static void -ufo_roof_build_task_init(UfoRoofBuildTask *self) +ufo_roof_build_task_init(RoofBuildTask *self) { self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self); } diff --git a/src/ufo-roof-read-file.h b/src/ufo-roof-read-file.h deleted file mode 100644 index 787b441..0000000 --- a/src/ufo-roof-read-file.h +++ /dev/null @@ -1,8 +0,0 @@ -#ifndef __UFO_ROOF_READ_FILE_H -#define __UFO_ROOF_READ_FILE_H - -#include "ufo-roof-read.h" - -UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, const char *path, guint file_id, GError **error); - -#endif diff --git a/src/ufo-roof-read-socket.h b/src/ufo-roof-read-socket.h deleted file mode 100644 index 74b0742..0000000 --- a/src/ufo-roof-read-socket.h +++ /dev/null @@ -1,8 +0,0 @@ -#ifndef __UFO_ROOF_READ_SOCKET_H -#define __UFO_ROOF_READ_SOCKET_H - -#include "ufo-roof-read.h" - -UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error); - -#endif diff --git a/src/ufo-roof-read-task.c b/src/ufo-roof-read-task.c index 7d55b79..83b9627 100644 --- a/src/ufo-roof-read-task.c +++ b/src/ufo-roof-read-task.c @@ -35,7 +35,7 @@ struct _UfoRoofReadTaskPrivate { gchar *config; // ROOF configuration file name UfoRoofConfig *cfg; // Parsed ROOF parameters - UfoRoofReadInterface *reader; + UfoRoofReadInterface *reader[16]; guint id; // Reader ID (defince sequential port number) gboolean stop; // Flag requiring termination @@ -76,6 +76,7 @@ ufo_roof_read_task_setup (UfoTask *task, UfoResources *resources, GError **error) { + guint i; GError *gerr = NULL; UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task); @@ -91,17 +92,19 @@ ufo_roof_read_task_setup (UfoTask *task, roof_setup_error(error, "Specified Stream ID is %u, but only %u data streams is configured", priv->id, priv->cfg->n_streams); // Start actual reader - if (priv->simulate) { - if (!priv->path) - roof_setup_error(error, "Path to simulated data should be specified"); - - priv->reader = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id + priv->first_file_number, &gerr); - } else - priv->reader = ufo_roof_read_socket_new(priv->cfg, priv->id, &gerr); - if (!priv->reader) - roof_propagate_error(error, gerr, "roof_read_new: "); + for (i = 0; (i < priv->cfg->sockets_per_thread)&&((priv->id * priv->cfg->sockets_per_thread + i) < priv->cfg->n_streams); i++) { + if (priv->simulate) { + if (!priv->path) + roof_setup_error(error, "Path to simulated data should be specified"); + + priv->reader[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id * priv->cfg->sockets_per_thread + i + priv->first_file_number, &gerr); + } else + priv->reader[i] = ufo_roof_read_socket_new(priv->cfg, priv->id * priv->cfg->sockets_per_thread + i, &gerr); + if (!priv->reader[i]) + roof_propagate_error(error, gerr, "roof_read_new: "); + } } @@ -110,8 +113,10 @@ ufo_roof_read_task_finalize (GObject *object) { UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object); - if (priv->reader) { - priv->reader->close(priv->reader); + for (guint i = 0; i < priv->cfg->sockets_per_thread; i++) { + if (priv->reader[i]) { + priv->reader[i]->close(priv->reader[i]); + } } if (priv->cfg) { @@ -180,16 +185,47 @@ ufo_roof_read_task_generate (UfoTask *task, void *output_buffer = ufo_buffer_get_host_array(output, NULL); UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(output_buffer, cfg); + uint64_t current_id[16] = {0}; + uint64_t grabbed[16] = {0}; + + static uint64_t errors = 0; +retry: if (priv->stop) return FALSE; - guint packets = priv->reader->read(priv->reader, output_buffer, &gerr); + for (guint sid = 0; (sid < cfg->sockets_per_thread)&&((priv->id * cfg->sockets_per_thread + sid) < priv->cfg->n_streams); sid++) { + + guint packets = priv->reader[sid]->read(priv->reader[sid], output_buffer, &gerr); if (gerr) { g_warning("Error reciving data: %s", gerr->message); g_error_free(gerr); return FALSE; } + const uint8_t *fragment = output_buffer; + for (guint i = 0; i < packets; i++) { + guint64 packet_id = 0; + + // Otherwise considered consecutive and handled by the buffer + if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { + UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); + packet_id = be64toh(pheader->packet_id) + 1; + } + + if ((current_id[sid])&&(current_id[sid] + 1 != packet_id)) { + printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets); + //if (++errors > 1000) exit(1); + } + current_id[sid] = packet_id; + grabbed[sid]++; + if ((grabbed[sid]%1000000)==0) printf("Channel %i(%i): Grabbed %lu Mpackets\n", priv->id * cfg->sockets_per_thread + sid, sid, grabbed[sid]/1000000); + + fragment += cfg->max_packet_size; + } + } + + goto retry; + #ifdef UFO_ROOF_DEBUG // Store first received packet on each channel... static int debug = 1; @@ -206,12 +242,12 @@ ufo_roof_read_task_generate (UfoTask *task, #endif /* UFO_ROOF_DEBUG */ // FIXME: End of data (shall we restart in the network case?) - if (!packets) - return FALSE; +// if (!packets) +// return FALSE; // Shall I use UFO metadata (ufo_buffer_set_metadata) insead? header->channel_id = priv->id; - header->n_packets = packets; +// header->n_packets = packets; return TRUE; } diff --git a/src/ufo-roof-read.h b/src/ufo-roof-read.h deleted file mode 100644 index 5f0853c..0000000 --- a/src/ufo-roof-read.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef __UFO_ROOF_READ_H -#define __UFO_ROOF_READ_H - -#include "ufo-roof-config.h" - -typedef struct _UfoRoofReadInterface UfoRoofReadInterface; - -typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t *buf, GError **error); -typedef void (*UfoRoofReaderClose)(UfoRoofReadInterface *reader); - -struct _UfoRoofReadInterface { - UfoRoofReaderRead read; - UfoRoofReaderClose close; -}; - - -#endif /* __UFO_ROOF_READ_H */ diff --git a/src/ufo-roof.h b/src/ufo-roof.h index 23f8429..ea422e2 100644 --- a/src/ufo-roof.h +++ b/src/ufo-roof.h @@ -1,21 +1,21 @@ -#ifndef __UFO_ROOF_H -#define __UFO_ROOF_H +#ifndef __ROOF_H +#define __ROOF_H #include "ufo-roof-config.h" #include "ufo-roof-error.h" -//#define UFO_ROOF_DEBUG -#define UFO_ROOF_PACKET_HEADER(buf) ((UfoRoofPacketHeader*)(buf)) -#define UFO_ROOF_PACKET_BLOCK_HEADER(buf, cfg) ((UfoRoofPacketBlockHeader*)(((uint8_t*)buf) + cfg->max_packets * cfg->max_packet_size)) +//#define ROOF_DEBUG +#define ROOF_PACKET_HEADER(buf) ((RoofPacketHeader*)(buf)) +#define ROOF_PACKET_BLOCK_HEADER(buf, cfg) ((RoofPacketBlockHeader*)(((uint8_t*)buf) + cfg->max_packets * cfg->max_packet_size)) typedef struct { uint64_t packet_id; // Sequential Packet ID (numbered from 0) -} UfoRoofPacketHeader; +} RoofPacketHeader; typedef struct { uint32_t channel_id; // Specifies channel on which the data were received (numbered from 0) uint32_t n_packets; // Number of packets -} UfoRoofPacketBlockHeader; +} RoofPacketBlockHeader; -#endif /* __UFO_ROOF_H */ +#endif /* __ROOF_H */ |