From 5172421d248250b4ab3b69eb57fd83656e23a4da Mon Sep 17 00:00:00 2001 From: "Suren A. Chilingaryan" Date: Thu, 3 Sep 2020 03:00:30 +0200 Subject: This is unfinished work implemeting out-of-UFO network servers --- src/save/memcpy.c | 344 ++++++++++++++++++++++++ src/save/memcpy.h | 63 +++++ src/save/ufo-roof-buffer-build-task.c | 474 ++++++++++++++++++++++++++++++++++ src/save/ufo-roof-read-thread.c | 155 +++++++++++ src/save/ufo-roof-read-thread.h | 23 ++ src/save/ufo-roof-read.c | 123 +++++++++ src/save/ufo-roof-read.h | 61 +++++ 7 files changed, 1243 insertions(+) create mode 100644 src/save/memcpy.c create mode 100644 src/save/memcpy.h create mode 100644 src/save/ufo-roof-buffer-build-task.c create mode 100644 src/save/ufo-roof-read-thread.c create mode 100644 src/save/ufo-roof-read-thread.h create mode 100644 src/save/ufo-roof-read.c create mode 100644 src/save/ufo-roof-read.h (limited to 'src/save') diff --git a/src/save/memcpy.c b/src/save/memcpy.c new file mode 100644 index 0000000..5c29d01 --- /dev/null +++ b/src/save/memcpy.c @@ -0,0 +1,344 @@ +/******************************************************************** + ** File: memcpy.c + ** + ** Copyright (C) 1999-2010 Daniel Vik + ** + ** This software is provided 'as-is', without any express or implied + ** warranty. In no event will the authors be held liable for any + ** damages arising from the use of this software. + ** Permission is granted to anyone to use this software for any + ** purpose, including commercial applications, and to alter it and + ** redistribute it freely, subject to the following restrictions: + ** + ** 1. The origin of this software must not be misrepresented; you + ** must not claim that you wrote the original software. If you + ** use this software in a product, an acknowledgment in the + ** use this software in a product, an acknowledgment in the + ** product documentation would be appreciated but is not + ** required. + ** + ** 2. Altered source versions must be plainly marked as such, and + ** must not be misrepresented as being the original software. + ** + ** 3. This notice may not be removed or altered from any source + ** distribution. + ** + ** + ** Description: Implementation of the standard library function memcpy. + ** This implementation of memcpy() is ANSI-C89 compatible. + ** + ** The following configuration options can be set: + ** + ** LITTLE_ENDIAN - Uses processor with little endian + ** addressing. Default is big endian. + ** + ** PRE_INC_PTRS - Use pre increment of pointers. + ** Default is post increment of + ** pointers. + ** + ** INDEXED_COPY - Copying data using array indexing. + ** Using this option, disables the + ** PRE_INC_PTRS option. + ** + ** MEMCPY_64BIT - Compiles memcpy for 64 bit + ** architectures + ** + ** + ** Best Settings: + ** + ** Intel x86: LITTLE_ENDIAN and INDEXED_COPY + ** + *******************************************************************/ + + + +/******************************************************************** + ** Configuration definitions. + *******************************************************************/ + +#define LITTLE_ENDIAN +#define INDEXED_COPY + + +/******************************************************************** + ** Includes for size_t definition + *******************************************************************/ + +#include + + +/******************************************************************** + ** Typedefs + *******************************************************************/ + +typedef unsigned char UInt8; +typedef unsigned short UInt16; +typedef unsigned int UInt32; +#ifdef _WIN32 +typedef unsigned __int64 UInt64; +#else +typedef unsigned long long UInt64; +#endif + +#ifdef MEMCPY_64BIT +typedef UInt64 UIntN; +#define TYPE_WIDTH 8L +#else +typedef UInt32 UIntN; +#define TYPE_WIDTH 4L +#endif + + +/******************************************************************** + ** Remove definitions when INDEXED_COPY is defined. + *******************************************************************/ + +#if defined (INDEXED_COPY) +#if defined (PRE_INC_PTRS) +#undef PRE_INC_PTRS +#endif /*PRE_INC_PTRS*/ +#endif /*INDEXED_COPY*/ + + + +/******************************************************************** + ** Definitions for pre and post increment of pointers. + *******************************************************************/ + +#if defined (PRE_INC_PTRS) + +#define START_VAL(x) (x)-- +#define INC_VAL(x) *++(x) +#define CAST_TO_U8(p, o) ((UInt8*)p + o + TYPE_WIDTH) +#define WHILE_DEST_BREAK (TYPE_WIDTH - 1) +#define PRE_LOOP_ADJUST - (TYPE_WIDTH - 1) +#define PRE_SWITCH_ADJUST + 1 + +#else /*PRE_INC_PTRS*/ + +#define START_VAL(x) +#define INC_VAL(x) *(x)++ +#define CAST_TO_U8(p, o) ((UInt8*)p + o) +#define WHILE_DEST_BREAK 0 +#define PRE_LOOP_ADJUST +#define PRE_SWITCH_ADJUST + +#endif /*PRE_INC_PTRS*/ + + + +/******************************************************************** + ** Definitions for endians + *******************************************************************/ + +#if defined (LITTLE_ENDIAN) + +#define SHL >> +#define SHR << + +#else /* LITTLE_ENDIAN */ + +#define SHL << +#define SHR >> + +#endif /* LITTLE_ENDIAN */ + + + +/******************************************************************** + ** Macros for copying words of different alignment. + ** Uses incremening pointers. + *******************************************************************/ + +#define CP_INCR() { \ + INC_VAL(dstN) = INC_VAL(srcN); \ +} + +#define CP_INCR_SH(shl, shr) { \ + dstWord = srcWord SHL shl; \ + srcWord = INC_VAL(srcN); \ + dstWord |= srcWord SHR shr; \ + INC_VAL(dstN) = dstWord; \ +} + + + +/******************************************************************** + ** Macros for copying words of different alignment. + ** Uses array indexes. + *******************************************************************/ + +#define CP_INDEX(idx) { \ + dstN[idx] = srcN[idx]; \ +} + +#define CP_INDEX_SH(x, shl, shr) { \ + dstWord = srcWord SHL shl; \ + srcWord = srcN[x]; \ + dstWord |= srcWord SHR shr; \ + dstN[x] = dstWord; \ +} + + + +/******************************************************************** + ** Macros for copying words of different alignment. + ** Uses incremening pointers or array indexes depending on + ** configuration. + *******************************************************************/ + +#if defined (INDEXED_COPY) + +#define CP(idx) CP_INDEX(idx) +#define CP_SH(idx, shl, shr) CP_INDEX_SH(idx, shl, shr) + +#define INC_INDEX(p, o) ((p) += (o)) + +#else /* INDEXED_COPY */ + +#define CP(idx) CP_INCR() +#define CP_SH(idx, shl, shr) CP_INCR_SH(shl, shr) + +#define INC_INDEX(p, o) + +#endif /* INDEXED_COPY */ + + +#define COPY_REMAINING(count) { \ + START_VAL(dst8); \ + START_VAL(src8); \ + \ + switch (count) { \ + case 7: INC_VAL(dst8) = INC_VAL(src8); \ + case 6: INC_VAL(dst8) = INC_VAL(src8); \ + case 5: INC_VAL(dst8) = INC_VAL(src8); \ + case 4: INC_VAL(dst8) = INC_VAL(src8); \ + case 3: INC_VAL(dst8) = INC_VAL(src8); \ + case 2: INC_VAL(dst8) = INC_VAL(src8); \ + case 1: INC_VAL(dst8) = INC_VAL(src8); \ + case 0: \ + default: break; \ + } \ +} + +#define COPY_NO_SHIFT() { \ + UIntN* dstN = (UIntN*)(dst8 PRE_LOOP_ADJUST); \ + UIntN* srcN = (UIntN*)(src8 PRE_LOOP_ADJUST); \ + size_t length = count / TYPE_WIDTH; \ + \ + while (length & 7) { \ + CP_INCR(); \ + length--; \ + } \ + \ + length /= 8; \ + \ + while (length--) { \ + CP(0); \ + CP(1); \ + CP(2); \ + CP(3); \ + CP(4); \ + CP(5); \ + CP(6); \ + CP(7); \ + \ + INC_INDEX(dstN, 8); \ + INC_INDEX(srcN, 8); \ + } \ + \ + src8 = CAST_TO_U8(srcN, 0); \ + dst8 = CAST_TO_U8(dstN, 0); \ + \ + COPY_REMAINING(count & (TYPE_WIDTH - 1)); \ + \ + return dest; \ +} + + + +#define COPY_SHIFT(shift) { \ + UIntN* dstN = (UIntN*)((((UIntN)dst8) PRE_LOOP_ADJUST) & \ + ~(TYPE_WIDTH - 1)); \ + UIntN* srcN = (UIntN*)((((UIntN)src8) PRE_LOOP_ADJUST) & \ + ~(TYPE_WIDTH - 1)); \ + size_t length = count / TYPE_WIDTH; \ + UIntN srcWord = INC_VAL(srcN); \ + UIntN dstWord; \ + \ + while (length & 7) { \ + CP_INCR_SH(8 * shift, 8 * (TYPE_WIDTH - shift)); \ + length--; \ + } \ + \ + length /= 8; \ + \ + while (length--) { \ + CP_SH(0, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + CP_SH(1, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + CP_SH(2, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + CP_SH(3, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + CP_SH(4, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + CP_SH(5, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + CP_SH(6, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + CP_SH(7, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + \ + INC_INDEX(dstN, 8); \ + INC_INDEX(srcN, 8); \ + } \ + \ + src8 = CAST_TO_U8(srcN, (shift - TYPE_WIDTH)); \ + dst8 = CAST_TO_U8(dstN, 0); \ + \ + COPY_REMAINING(count & (TYPE_WIDTH - 1)); \ + \ + return dest; \ +} + + +/******************************************************************** + ** + ** void *memcpy(void *dest, const void *src, size_t count) + ** + ** Args: dest - pointer to destination buffer + ** src - pointer to source buffer + ** count - number of bytes to copy + ** + ** Return: A pointer to destination buffer + ** + ** Purpose: Copies count bytes from src to dest. + ** No overlap check is performed. + ** + *******************************************************************/ + +void *fast_memcpy(void *dest, const void *src, size_t count) +{ + UInt8* dst8 = (UInt8*)dest; + UInt8* src8 = (UInt8*)src; + + if (count < 8) { + COPY_REMAINING(count); + return dest; + } + + START_VAL(dst8); + START_VAL(src8); + + while (((UIntN)dst8 & (TYPE_WIDTH - 1)) != WHILE_DEST_BREAK) { + INC_VAL(dst8) = INC_VAL(src8); + count--; + } + + switch ((((UIntN)src8) PRE_SWITCH_ADJUST) & (TYPE_WIDTH - 1)) { + case 0: COPY_NO_SHIFT(); break; + case 1: COPY_SHIFT(1); break; + case 2: COPY_SHIFT(2); break; + case 3: COPY_SHIFT(3); break; +#if TYPE_WIDTH > 4 + case 4: COPY_SHIFT(4); break; + case 5: COPY_SHIFT(5); break; + case 6: COPY_SHIFT(6); break; + case 7: COPY_SHIFT(7); break; +#endif + } +} diff --git a/src/save/memcpy.h b/src/save/memcpy.h new file mode 100644 index 0000000..0714823 --- /dev/null +++ b/src/save/memcpy.h @@ -0,0 +1,63 @@ +/******************************************************************** + ** File: memcpy.h + ** + ** Copyright (C) 2005 Daniel Vik + ** + ** This software is provided 'as-is', without any express or implied + ** warranty. In no event will the authors be held liable for any + ** damages arising from the use of this software. + ** Permission is granted to anyone to use this software for any + ** purpose, including commercial applications, and to alter it and + ** redistribute it freely, subject to the following restrictions: + ** + ** 1. The origin of this software must not be misrepresented; you + ** must not claim that you wrote the original software. If you + ** use this software in a product, an acknowledgment in the + ** use this software in a product, an acknowledgment in the + ** product documentation would be appreciated but is not + ** required. + ** + ** 2. Altered source versions must be plainly marked as such, and + ** must not be misrepresented as being the original software. + ** + ** 3. This notice may not be removed or altered from any source + ** distribution. + ** + ** + ** Description: Implementation of the standard library function memcpy. + ** This implementation of memcpy() is ANSI-C89 compatible. + ** + *******************************************************************/ + + +/******************************************************************** + ** Includes for size_t definition + *******************************************************************/ + +#include + + +/******************************************************************** + ** + ** void *memcpy(void *dest, const void *src, size_t count) + ** + ** Args: dest - pointer to destination buffer + ** src - pointer to source buffer + ** count - number of bytes to copy + ** + ** Return: A pointer to destination buffer + ** + ** Purpose: Copies count bytes from src to dest. No overlap check + ** is performed. + ** + *******************************************************************/ + +#ifdef __cplusplus +extern "C" { +#endif + +void *fast_memcpy(void *dest, const void *src, size_t count); + +#ifdef __cplusplus +} +#endif diff --git a/src/save/ufo-roof-buffer-build-task.c b/src/save/ufo-roof-buffer-build-task.c new file mode 100644 index 0000000..bdb7a7d --- /dev/null +++ b/src/save/ufo-roof-buffer-build-task.c @@ -0,0 +1,474 @@ +/* + * Copyright (C) 2011-2015 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ + +#include +#include +#include + +#ifdef __APPLE__ +#include +#else +#include +#endif + +#include "ufo-roof.h" +#include "ufo-roof-buffer.h" +#include "ufo-roof-build-task.h" + +typedef enum { + BUILD_AUTO = 0, + BUILD_RAW, + BUILD_SINO, + BUILD_UFO +} BuildType; + +struct _UfoRoofBuildTaskPrivate { + gchar *config; // ROOF configuration file name + UfoRoofConfig *cfg; // Parsed ROOF parameters + UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet + UfoRoofReadInterface; *rdi; // Reader interfaces, one per socket (no threading) + UfoRoofRead *rd; // Threading interface + + gchar *path; // UFO file path for simulation mode + guint first_file_number; // Number of a first simulated file (0 or 1) + + BuildType build; // What dataset do we build: ROOF sinogram or raw network data + guint number; // Number of datasets to read + gboolean stop; // Stop flag + gboolean simulate; // Indicates if we are running in network or simulation modes + + guint64 announced; // For debugging + guint64 generated; // Total number for control + + struct timespec last_fragment_timestamp; +}; + +static void ufo_task_interface_init (UfoTaskIface *iface); + +G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE, + G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK, + ufo_task_interface_init)) + +#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate)) + + + +static GEnumValue build_values[] = { + { BUILD_AUTO, "BUILD_AUTO", "auto" }, + { BUILD_RAW, "BUILD_RAW", "raw" }, + { BUILD_SINO, "BUILD_SINO", "sino" }, + { BUILD_UFO, "BUILD_UFO", "ufo" }, + { 0, NULL, NULL} +}; + +enum { + PROP_0, + PROP_STOP, + PROP_SIMULATE, + PROP_PATH, + PROP_FIRST, + PROP_NUMBER, + PROP_BUILD, + PROP_CONFIG, + N_PROPERTIES +}; + +static GParamSpec *properties[N_PROPERTIES] = { NULL, }; + +UfoNode * +ufo_roof_build_task_new (void) +{ + return UFO_NODE (g_object_new (UFO_TYPE_ROOF_BUILD_TASK, NULL)); +} + +static void +ufo_roof_build_task_setup (UfoTask *task, + UfoResources *resources, + GError **error) +{ + guint i; + GError *gerr = NULL; + + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + + if (!priv->config) + roof_setup_error(error, "ROOF configuration is not specified"); + + priv->cfg = ufo_roof_config_new(priv->config, priv->simulate?UFO_ROOF_CONFIG_SIMULATION:UFO_ROOF_CONFIG_DEFAULT, &gerr); + if (!priv->cfg) + roof_propagate_error(error, gerr, "roof-build-setup: "); + + for (i = 0; i < priv->cfg->n_streams; i++) { + if (priv->simulate) { + if (!priv->path) + roof_setup_error(error, "Path to simulated data should be specified"); + + priv->rdi[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id * priv->cfg->sockets_per_thread + i + priv->first_file_number, &gerr); + } else + priv->rdi[i] = ufo_roof_read_socket_new(priv->cfg, priv->id * priv->cfg->sockets_per_thread + i, &gerr); + + if (!priv->rdi[i]) + roof_propagate_error(error, gerr, "roof_read_interface_new: "); + } + + if (priv->build == BUILD_AUTO) { + if (priv->cfg->roof_mode) priv->build = BUILD_SINO; + else priv->build = BUILD_RAW; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_BUILD]); + } + + priv->buf = ufo_roof_buffer_new(priv->cfg, (priv->build == BUILD_RAW)?1:2, priv->number, &gerr); + if ((gerr)||(!priv->buf)) + roof_propagate_error(error, gerr, "roof_buffer_new: "); + + priv->rd = ufo_roof_read_new(priv->cfg, priv->rdi, priv->buf, &gerr); + if (gerr) + roof_propagate_error(error, gerr, "roof_read_new: "); + + clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); +} + +static void +ufo_roof_build_task_finalize (GObject *object) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + if (priv->rd) { + ufo_roof_read_free(priv->rd); + priv->rd = NULL; + } + + if (priv->buf) { + ufo_roof_buffer_free(priv->buf); + priv->buf = NULL; + } + + if (priv->cfg) { + ufo_roof_config_free(priv->cfg); + priv->cfg = NULL; + } + + if (priv->config) { + g_free(priv->config); + priv->config = NULL; + } + + if (priv->path) { + g_free(priv->path); + priv->path = NULL; + } + + G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object); +} + + + +static void +ufo_roof_build_task_get_requisition (UfoTask *task, + UfoBuffer **inputs, + UfoRequisition *requisition, + GError **error) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + + // FIXME: Can we handle data types more elegant? + if (priv->build == BUILD_RAW) { + guint bytes = priv->cfg->dataset_size; + requisition->n_dims = 1; + requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0); + } else if (priv->build == BUILD_SINO) { + guint bytes = priv->cfg->fan_bins * priv->cfg->bit_depth / 8; + requisition->n_dims = 2; + requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0); + requisition->dims[1] = priv->cfg->fan_projections; + } else if (priv->build == BUILD_UFO) { + requisition->n_dims = 2; + requisition->dims[0] = priv->cfg->fan_bins; + requisition->dims[1] = priv->cfg->fan_projections; + } +} + +static guint +ufo_roof_build_task_get_num_inputs (UfoTask *task) +{ + return 1; +} + +static guint +ufo_roof_build_task_get_num_dimensions (UfoTask *task, + guint input) +{ + return 1; +} + +static UfoTaskMode +ufo_roof_build_task_get_mode (UfoTask *task) +{ + return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR; +} + + +static gboolean +ufo_roof_build_task_generate (UfoTask *task, + UfoBuffer *output, + UfoRequisition *requisition) +{ + gboolean ready = FALSE; + gulong seqid; + GError *gerr = NULL; + GValue ival = G_VALUE_INIT; + GValue lval = G_VALUE_INIT; + + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + UfoRoofConfig *cfg = priv->cfg; + UfoRoofBuffer *buf = priv->buf; + + void *output_buffer = ufo_buffer_get_host_array(output, NULL); + + if (priv->stop) + return FALSE; + + // FIXME: Wait or break. Not both. + do { + ready = ufo_roof_buffer_wait_dataset(buf, output_buffer, &seqid, cfg->network_timeout, &gerr); + if (gerr) roof_print_error(gerr); + + if (!ready) { + ready = ufo_roof_buffer_skip_to_ready(buf); + if (!ready) { + priv->stop = TRUE; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); + return FALSE; + } + } + } while (!ready); + + // FIXME: integrate fastwriter somewhere here? + + if (priv->build == BUILD_UFO) { + switch (cfg->bit_depth) { + case 8: + ufo_buffer_convert(output, UFO_BUFFER_DEPTH_8U); + break; + case 16: + ufo_buffer_convert(output, UFO_BUFFER_DEPTH_16U); + break; + case 32: + ufo_buffer_convert(output, UFO_BUFFER_DEPTH_32U); + break; + default: + printf("Usupported bit-depth %u\n", cfg->bit_depth); + } + } + + // Metadata: plane and sequential number within the plane + g_value_init (&ival, G_TYPE_UINT); + g_value_init (&lval, G_TYPE_ULONG); + if (priv->build != BUILD_UFO) { + g_value_set_uint (&ival, cfg->bit_depth); + ufo_buffer_set_metadata (output, "bpp", &ival); + } + g_value_set_uint (&ival, 1 + seqid % cfg->n_planes); + ufo_buffer_set_metadata (output, "plane", &ival); + g_value_set_ulong (&lval, seqid / cfg->n_planes); + ufo_buffer_set_metadata (output, "plane_id", &lval); + g_value_set_ulong (&lval, seqid); + ufo_buffer_set_metadata (output, "seqid", &lval); + g_value_unset(&lval); + g_value_unset(&ival); + + // FIXME: Or shall we start from counting from the ID of the first registerd dataset + if ((priv->number)&&(buf->current_id >= priv->number)) { +// printf("%lu datasets processed, stopping\n", buf->current_id); + priv->stop = TRUE; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); + } + + if (ready) priv->generated++; + + if (((priv->number > 0)&&(priv->number <= 100))||((buf->current_id - priv->announced) > 1000)) { + if (ready) + printf("Processing dataset %li (ready ), next : %u out of %u\n", buf->current_id, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + else + printf("Skipping dataset %li (timeout), acquired: %u out of %u\n", buf->current_id + 1, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + priv->announced = buf->current_id; + } + + return ready; +} + +static void +ufo_roof_build_task_set_property (GObject *object, + guint property_id, + const GValue *value, + GParamSpec *pspec) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_CONFIG: + if (priv->config) g_free(priv->config); + priv->config = g_value_dup_string(value); + break; + case PROP_STOP: + priv->stop = g_value_get_boolean (value); + break; + case PROP_SIMULATE: + priv->simulate = g_value_get_boolean (value); + break; + case PROP_PATH: + if (priv->path) g_free(priv->path); + priv->path = g_value_dup_string(value); + break; + case PROP_FIRST: + priv->first_file_number = g_value_get_uint (value); + break; + case PROP_NUMBER: + priv->number = g_value_get_uint (value); + break; + case PROP_BUILD: + priv->build = g_value_get_enum (value); + if ((priv->build == BUILD_AUTO)&&(priv->cfg)) { + if (priv->cfg->roof_mode) priv->build = BUILD_SINO; + else priv->build = BUILD_RAW; + } + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_roof_build_task_get_property (GObject *object, + guint property_id, + GValue *value, + GParamSpec *pspec) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_CONFIG: + g_value_set_string(value, priv->config); + break; + case PROP_STOP: + g_value_set_boolean (value, priv->stop); + break; + case PROP_SIMULATE: + g_value_set_boolean (value, priv->simulate); + break; + case PROP_PATH: + g_value_set_string(value, priv->path?priv->path:""); + break; + case PROP_FIRST: + g_value_set_uint (value, priv->first_file_number); + break; + case PROP_NUMBER: + g_value_set_uint (value, priv->number); + break; + case PROP_BUILD: + g_value_set_enum (value, priv->build); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_task_interface_init (UfoTaskIface *iface) +{ + iface->setup = ufo_roof_build_task_setup; + iface->get_num_inputs = ufo_roof_build_task_get_num_inputs; + iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions; + iface->get_mode = ufo_roof_build_task_get_mode; + iface->get_requisition = ufo_roof_build_task_get_requisition; + iface->generate = ufo_roof_build_task_generate; +} + +static void +ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) +{ + GObjectClass *oclass = G_OBJECT_CLASS (klass); + + oclass->set_property = ufo_roof_build_task_set_property; + oclass->get_property = ufo_roof_build_task_get_property; + oclass->finalize = ufo_roof_build_task_finalize; + + properties[PROP_CONFIG] = + g_param_spec_string ("config", + "ROOF configuration", + "Path to ROOF configuration file", + "", + G_PARAM_READWRITE); + + properties[PROP_STOP] = + g_param_spec_boolean ("stop", + "Stop flag", + "Stop socket servers and terminates filter execution", + FALSE, + G_PARAM_READWRITE); + + properties[PROP_SIMULATE] = + g_param_spec_boolean ("simulate", + "Simulation mode", + "Read data from the specified files instead of network", + FALSE, + G_PARAM_READWRITE); + + properties[PROP_PATH] = + g_param_spec_string ("path", + "Input files for simulation mode", + "Optional path to input files for simulation mode (parameter from configuration file is used if not specified)", + "", + G_PARAM_READWRITE); + + properties[PROP_FIRST] = + g_param_spec_uint ("first_file_number", + "Offset to the first read file", + "Offset to the first read file", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE); + + properties[PROP_NUMBER] = + g_param_spec_uint("number", + "Number of datasets to receive", + "Number of datasets to receive", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE); + + properties[PROP_BUILD] = + g_param_spec_enum ("build", + "Build type (\"raw\", \"sino\", \"ufo\")", + "Build type (\"raw\" - raw data, \"sino\" - arrange in sinogram, \"ufo\" - arrange in sinogram and convert UFO floating-point format)", + g_enum_register_static ("build", build_values), + 0, G_PARAM_READWRITE); + + + for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++) + g_object_class_install_property (oclass, i, properties[i]); + + g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate)); +} + +static void +ufo_roof_build_task_init(UfoRoofBuildTask *self) +{ + self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self); +} diff --git a/src/save/ufo-roof-read-thread.c b/src/save/ufo-roof-read-thread.c new file mode 100644 index 0000000..60868d2 --- /dev/null +++ b/src/save/ufo-roof-read-thread.c @@ -0,0 +1,155 @@ +#include +#include + +#include + +#include "ufo-roof-buffer.h" +#include "ufo-roof-read-thread.h" +#include "ufo-roof-read.h" + + +UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error) { + int i; + GError *gerr = NULL; + + UfoRoofReadThread *thr = (UfoRoofReadThread*)calloc(1, sizeof(UfoRoofReadThread)); + if (!ctx) roof_new_error(error, "Can't allocate UfoRoofReadThread context"); + + thr->rdbuf = malloc(cfg->max_packets * cfg->max_packet_size); + if (!thr->rdbuf) { + ufo_roof_read_thread_free(thr); + roof_new_error(error, "Can't allocate memory for temporary packet receiver buffer"); + } + + thr->rd = rd; + thr->from = from; + thr->to = to; + + + return thr; + +} + +void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error) { + if (!thr) return; + if (thr->rdbuf) free(thr->rdbuf); + + ufo_roof_thread_stop(thr, error); + free(thr); +} + +static int ufo_roof_read_thread_run(void *arg) { + GError *gerr = NULL; + + UfoRoofReadThread *thr = (UfoRoofReadThread*)arg; + + UfoRoofConfig *cfg = thr->rd->cfg; + UfoRoofBuffer *buf = thr->rd->buf; + UfoRoofReadInterface *rdi = thr->rd->rdi; + + guint from = thr->from; + guint to = thr->to; + + void *rdbuf = thr->rdbuf; + + uint64_t current_id[to - from] = {0}; + uint64_t grabbed[to - from] = {0}; + + static uint64_t errors = 0; + + while (thr->op != UFO_ROOF_OP_STOP) { + for (guint sid = from; sid < to; sid++) { + // FIXME break waiting on stop? If no packets are send + guint packets = rdi[sid]->read(priv->reader[sid], rdbuf, &gerr); + if (gerr) roof_print_error(gerr); + + guint ready = false; + const uint8_t *fragment = (uint8_t*)rdbuf; + for (guint i = 0; i < packets; i++) { + guint64 packet_id = 0; + + // Otherwise considered consecutive and handled by the buffer + if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { + UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); + packet_id = be64toh(pheader->packet_id) + 1; + } + +#ifdef UFO_ROOF_DEBUG + if ((current_id[sid - from])&&(current_id[sid - from] + 1 != packet_id)) { + printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets); + //if (++errors > 1000) exit(1); + } + + current_id[sid - from] = packet_id; + grabbed[sid - from]++; + if ((grabbed[sid - from]%1000000)==0) + printf("Channel %i: Grabbed %lu Mpackets\n", sid, grabbed[sid - from]/1000000); +#endif + + ready |= ufo_roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr); + if (gerr) roof_print_error(gerr); + + fragment += cfg->max_packet_size; + } // fragment-loop + + // send notification? Broadcast blocks, we don't want it. + if (ready) { + } + + } // socket-loop + } // operation-loop + + +#ifdef UFO_ROOF_DEBUG + // Store first received packet on each channel... + static int debug = 1; + if (debug) { + char fname[256]; + sprintf(fname, "channel%i_packet0.raw", priv->id); + FILE *f = fopen(fname, "w"); + if (f) { + fwrite(output_buffer, 1, cfg->max_packets * cfg->max_packet_size, f); + fclose(f); + } + debug = 0; + } +#endif /* UFO_ROOF_DEBUG */ + + // FIXME: End of data (shall we restart in the network case?) +// if (!packets) +// return FALSE; + + // Shall I use UFO metadata (ufo_buffer_set_metadata) insead? + header->channel_id = priv->id; +// header->n_packets = packets; + + return TRUE; +} + + +} + +gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error) { + int err; + if (!thr) return FALSE; + + err = thrd_create(&thr->thread, ufo_roof_read_thread_run, thr); + if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) spawning new read thread", err); + + ctx->launched = TRUE; + return TRUE; +} + +gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error) { + int err, ret; + if (!thr) return FALSE; + if (!thr->launched) return TRUE; + + // Signal thread termination + + err = thrd_join(&thr->thread, &ret); + if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) waiting for read thread termination", err); + + return TRUE; +} + diff --git a/src/save/ufo-roof-read-thread.h b/src/save/ufo-roof-read-thread.h new file mode 100644 index 0000000..ebe8989 --- /dev/null +++ b/src/save/ufo-roof-read-thread.h @@ -0,0 +1,23 @@ +#ifndef __UFO_ROOF_READ_THREAD_H +#define __UFO_ROOF_READ_THREAD_H + +typedef struct _UfoRoofReadThread UfoRoofReadThread; + +#include "ufo-roof-read.h" + +struct _UfoRoofReadThread { + UfoRoofRead *rd; // ROOF Reader Cotext + guint from, to; // Determines ports/files which are read by this thread (from is inclusive and to - exclusive) + + gboolean launched; // Flag indicating if thread is launched + thrd_t thread; // Thread ID +}; + +/* +UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error); +void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error); +gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error); +gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error); +*/ + +#endif /* __UFO_ROOF_READ_THREAD_H */ \ No newline at end of file diff --git a/src/save/ufo-roof-read.c b/src/save/ufo-roof-read.c new file mode 100644 index 0000000..f3d790d --- /dev/null +++ b/src/save/ufo-roof-read.c @@ -0,0 +1,123 @@ +#include +#include + +#include + +#include "ufo-roof-buffer.h" +#include "ufo-roof-read-thread.h" +#include "ufo-roof-read.h" + + + +#include +#include +#include +#include + +static void ufo_roof_read_optimize(UfoRoofConfig *cfg) { + // FIXME: request real-time permissions? + // FIXME: do we need this? +/* + uint32_t lat = 0; + int fd = open("/dev/cpu_dma_latency", O_RDWR); + if (fd == -1) { + fprintf(stderr, "Failed to open cpu_dma_latency: error %s", strerror(errno)); + } else { + write(fd, &lat, sizeof(lat)); + close(fd); + } +*/ +} + + +const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error) { + assert(ctx); + return ctx->settings; +} + + +UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error) { + guint i; + GError *gerr = NULL; + + UfoRoofRead *ctx = (UfoRoofRead*)calloc(1, sizeof(UfoRoofRead)); + if (!ctx) roof_new_error(error, "Can't allocate UfoRoofRead context"); + + ufo_roof_read_optimize(ctx); + + ctx->n_threads = cfg->n_read_threads; + ctx->cfg = cfg; + ctx->buf = buf; + ctx->rdi = rdi; + + ctx->thr = (UfoRoofReadThread*)calloc(cfg->n_read_threads, sizeof(UfoRoofReadThread)); + if (!ctx->thr) { + ufo_roof_read_free(ctx); + roof_new_error(error, "Error allocating UfoRoofReadThread contexts"); + } + + // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit + guint n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread; + if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) n_threads++; + ctx->n_threads = n_threads; + + guint extra = 0, sockets_per_thread = priv->cfg->n_streams / n_threads; + if (priv->cfg->n_streams % n_threads) extra = priv->cfg->n_streams - n_threads * sockets_per_thread; + + guint from, to; + for (i = 0, from = 0; i < n_threads; i++, from = to) { + guint to = from + sockets_per_thread; + if (i < extra) to++; + + ctx->thr[i]= ufo_roof_thread_new(ctx, from, to, &gerr); + if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i); + } + + return ctx; +} + +void ufo_roof_read_free(UfoRoofRead *ctx) { + if (!ctx) return; + + if (ctx->thr) { + int i; + ufo_roof_read_stop(ctx); + for (i = 0; i < ctx->n_threads; i++) { + if (ctx->thr[i]) + ufo_roof_read_thread_free(ctx->thr[i]); + } + free(ctx->thr); + } + free(ctx); +} + +gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error) { + gboolean ret; + GError *gerr; + + if ((!ctx)||(!ctx->thr)) return FALSE; + + for (int i = 0; i < ctx->n_threads; i++) { + if (!ctx->thr[i]) return FALSE; + ret = ufo_roof_read_thread_start(ctx, &gerr); + if (!ret) roof_propagate_error_with_retval(error, FALSE, gerr, "ufo_roof_read_thread_start (%i): ", i); + } + return TRUE; +} + +gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error) { + gboolean ret, res = FALSE; + GError *gerr; + + if ((!ctx)||(!ctx->thr)) return FALSE; + + for (int i = 0; i < ctx->n_threads; i++) { + if (!ctx->thr[i]) return FALSE; + ret = ufo_roof_read_thread_stop(ctx, &gerr); + if (!ret) g_propagate_perfixed_error(error, gerr, "ufo_roof_read_thread_stop (%i): ", i); + res |= !ret; + } + return !res; +} + + diff --git a/src/save/ufo-roof-read.h b/src/save/ufo-roof-read.h new file mode 100644 index 0000000..16e910b --- /dev/null +++ b/src/save/ufo-roof-read.h @@ -0,0 +1,61 @@ +#ifndef __UFO_ROOF_READ_H +#define __UFO_ROOF_READ_H + +typedef struct _UfoRoofRead UfoRoofRead; + +#include "ufo-roof-config.h" +#include "ufo-roof-buffer.h" +//#include "ufo-roof-read-thread.h" + +G_BEGIN_DECLS + +typedef struct _UfoRoofReadInterface UfoRoofReadInterface; +typedef struct _UfoRoofReadInterfaceSettings UfoRoofReadInterfaceSettings; + +//typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t *buf, GError **error); + +typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t **buf, GError **error); +typedef void (*UfoRoofReaderClose)(UfoRoofReadInterface *reader); + +struct _UfoRoofReadInterfaceSettings { + guint padding; // Packet size + padding +}; + +struct _UfoRoofReadInterface { + UfoRoofReaderRead read; + UfoRoofReaderClose close; + + UfoRoofReadInterfaceSettings settings; +}; + +typedef enum { + UFO_ROOF_OP_STOP = 0, + UFO_ROOF_OP_READ +} UfoRoofOp; + +struct _UfoRoofReadContext { + UfoRoofConfig *cfg; + UfoRoofBuffer *buf; + UfoRoofReadInterface *rdi; + + gulong packet +// UfoRoofReadThread *thr; + + guint n_threads; + UfoRoofOp op; // Current operation (reading by default) +}; + + + +/* +UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error); +void ufo_roof_read_free(UfoRoofRead *ctx); +gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error); +gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error); + +const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error); +*/ + +G_END_DECLS + +#endif /* __UFO_ROOF_READ_H */ -- cgit v1.2.3