diff options
author | Suren A. Chilingaryan <csa@suren.me> | 2019-11-17 09:16:57 +0100 |
---|---|---|
committer | Suren A. Chilingaryan <csa@suren.me> | 2019-11-17 09:16:57 +0100 |
commit | 3d93df54d024f49895db6277e873dccd10b5baec (patch) | |
tree | c664797c69e4b4680d04aee7669da03e452e0c5d | |
download | ufo-roof-temp-3d93df54d024f49895db6277e873dccd10b5baec.tar.gz ufo-roof-temp-3d93df54d024f49895db6277e873dccd10b5baec.tar.bz2 ufo-roof-temp-3d93df54d024f49895db6277e873dccd10b5baec.tar.xz ufo-roof-temp-3d93df54d024f49895db6277e873dccd10b5baec.zip |
The first test (file file-base simmulation)
-rw-r--r-- | CMakeLists.txt | 56 | ||||
-rw-r--r-- | build.sh | 1 | ||||
-rw-r--r-- | meson.build | 52 | ||||
-rw-r--r-- | src/CMakeLists.txt | 82 | ||||
-rw-r--r-- | src/DEVELOPMENT | 26 | ||||
-rw-r--r-- | src/kernels/CMakeLists.txt | 8 | ||||
-rw-r--r-- | src/kernels/meson.build | 7 | ||||
-rw-r--r-- | src/meson.build | 45 | ||||
-rw-r--r-- | src/ufo-roof-buffer.c | 122 | ||||
-rw-r--r-- | src/ufo-roof-buffer.h | 31 | ||||
-rw-r--r-- | src/ufo-roof-build-task.c | 325 | ||||
-rw-r--r-- | src/ufo-roof-build-task.h | 53 | ||||
-rw-r--r-- | src/ufo-roof-config.c | 172 | ||||
-rw-r--r-- | src/ufo-roof-config.h | 37 | ||||
-rw-r--r-- | src/ufo-roof-error.h | 58 | ||||
-rw-r--r-- | src/ufo-roof-read-file.c | 84 | ||||
-rw-r--r-- | src/ufo-roof-read-file.h | 8 | ||||
-rw-r--r-- | src/ufo-roof-read-socket.c | 118 | ||||
-rw-r--r-- | src/ufo-roof-read-socket.h | 8 | ||||
-rw-r--r-- | src/ufo-roof-read-task.c | 289 | ||||
-rw-r--r-- | src/ufo-roof-read-task.h | 53 | ||||
-rw-r--r-- | src/ufo-roof-read.h | 17 | ||||
-rw-r--r-- | src/ufo-roof.h | 20 | ||||
-rw-r--r-- | tests/roof.json | 31 | ||||
-rw-r--r-- | tests/roof.py | 46 | ||||
-rwxr-xr-x | tests/roof.sh | 3 | ||||
-rw-r--r-- | tests/roof.yaml | 26 | ||||
-rw-r--r-- | tests/test_file.sh | 11 |
28 files changed, 1789 insertions, 0 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..37c38fc --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,56 @@ +cmake_minimum_required(VERSION 2.8) + +project(ufo C CXX) + +set(TARNAME "ufo-roof") + +set(UFO_ROOF_VERSION_MAJOR "0") +set(UFO_ROOF_VERSION_MINOR "0") +set(UFO_ROOF_VERSION_PATCH "1") +set(UFO_ROOF_VERSION_STRING_LONG "${UFO_ROOF_VERSION_MAJOR}.${UFO_ROOF_VERSION_MINOR}.${UFO_ROOF_VERSION_PATCH}") +set(UFO_ROOF_VERSION_STRING_SHORT "${UFO_ROOF_VERSION_MAJOR}.${UFO_ROOF_VERSION_MINOR}") + +set(UFO_DESCRIPTION "UFO roof filters") +set(UFO_DESCRIPTION_SUMMARY "UFO roof filters") + +list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/common/cmake") + +if (APPLE) + set(CMAKE_MACOSX_RPATH "ON") + set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib") +endif() + +include(GNUInstallDirs) +include(PkgConfigVars) + +set(PKG_UFO_CORE_MIN_REQUIRED "0.16") + + +option(WITH_PROFILING "Enable profiling" OFF) + +if (WITH_PROFILING) + add_definitions("-pg") + set(CMAKE_C_FLAGS "-pg") +endif () + +find_package(OpenCL REQUIRED) +find_package(PkgConfig REQUIRED) + +pkg_check_modules(UFO ufo>=${PKG_UFO_CORE_MIN_REQUIRED} REQUIRED) + +pkg_check_variable(ufo plugindir) +pkg_check_variable(ufo kerneldir) + +link_directories(${UFO_LIBRARY_DIRS}) +add_definitions("-Wall -Wextra -fPIC") +add_definitions(-DG_LOG_DOMAIN="Ufo") + +if (CMAKE_COMPILER_IS_GNUCC OR ("${CMAKE_C_COMPILER_ID}" STREQUAL "Clang")) + add_definitions("-Wno-unused-parameter") +endif () +enable_testing() + +#add_subdirectory(docs) +#add_subdirectory(deps) +add_subdirectory(src) +#add_subdirectory(tests) diff --git a/build.sh b/build.sh new file mode 100644 index 0000000..050ef22 --- /dev/null +++ b/build.sh @@ -0,0 +1 @@ +PKG_CONFIG_PATH="/usr/local/lib64/pkgconfig/" meson build diff --git a/meson.build b/meson.build new file mode 100644 index 0000000..b2b5f3a --- /dev/null +++ b/meson.build @@ -0,0 +1,52 @@ +project('ufo-roof', + ['c', 'cpp'], + version: '0.0.1' +) + +version = meson.project_version() +components = version.split('.') +version_major = components[0] +version_minor = components[1] +version_patch = components[2] + +cc = meson.get_compiler('c') + +add_global_arguments( + '-DGLIB_DISABLE_DEPRECATION_WARNINGS', + '-DCL_USE_DEPRECATED_OPENCL_1_1_APIS', + '-DCL_USE_DEPRECATED_OPENCL_1_2_APIS', + language: 'c' +) + +if cc.get_id() == 'gcc' + add_global_arguments( + '-Wno-unused-parameter', + '-fopenmp', + language: ['c', 'cpp']) + add_global_link_arguments('-fopenmp', language: 'c') +endif + +opencl_dep = declare_dependency(dependencies: cc.find_library('OpenCL')) +ufo_dep = dependency('ufo', version: '>= 0.16') + +m_dep = declare_dependency( + dependencies: cc.find_library('m') +) + +plugin_install_dir = ufo_dep.get_pkgconfig_variable('plugindir') +kernel_install_dir = ufo_dep.get_pkgconfig_variable('kerneldir') + +prefixdir = get_option('prefix') +datadir = join_paths(prefixdir, get_option('datadir')) +docdir = join_paths(datadir, 'doc', 'ufo-filters') + +deps = [ + ufo_dep, + opencl_dep, + m_dep, +] + +#subdir('deps') +#subdir('docs') +subdir('src') +#subdir('tests') diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..0913528 --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,82 @@ +cmake_minimum_required(VERSION 2.6) + +#{{{ Sources +set(ufofilter_SRCS + ufo-roof-read-task.c + ufo-roof-build-task.c + ) + +set(common_SRCS + ufo-roof-config.c + ) + +set(roof_read_aux_SRCS + ufo-roof-read-socket.c + ufo-roof-read-file.c + ) + +set(roof_build_aux_SRCS + ufo-roof-buffer.c + ) + + +file(GLOB ufofilter_KERNELS "kernels/*.cl") +#}}} +#{{{ Variables +set(ufofilter_LIBS + m + ${UFO_LIBRARIES} + ${OpenCL_LIBRARIES}) + +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=gnu99 -pedantic -Wall -Wextra -fPIC -Wno-unused-parameter -Wno-deprecated-declarations") + +add_definitions(-D_FILE_OFFSET_BITS=64 -D_LARGE_FILES) +#}}} +#{{{ Dependency checks + + +#}}} +#{{{ Plugin targets +include_directories(${CMAKE_CURRENT_BINARY_DIR} + ${CMAKE_CURRENT_SOURCE_DIR} + ${OpenCL_INCLUDE_DIRS} + ${UFO_INCLUDE_DIRS}) + +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.in + ${CMAKE_CURRENT_BINARY_DIR}/config.h) + + +foreach(_src ${ufofilter_SRCS}) + # find plugin suffix + string(REGEX REPLACE "ufo-([^ \\.]+)-task.*" "\\1" task "${_src}") + + # build string to get auxalleanous sources + string(REPLACE "-" "_" _aux ${task}) + string(TOUPPER ${_aux} _aux_upper) + + # create an option name and add this to disable filters + set(_aux_src "${_aux}_aux_SRCS") + set(_aux_libs "${_aux}_aux_LIBS") + + string(REPLACE "-" "" _targetname ${task}) + set(target "ufofilter${_targetname}") + + # build single shared library per filter + if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin") + add_library(${target} MODULE ${_src} ${common_SRCS} ${${_aux_src}}) + else() + add_library(${target} SHARED ${_src} ${common_SRCS} ${${_aux_src}}) + endif() + + target_link_libraries(${target} ${ufofilter_LIBS} ${${_aux_libs}} ufoaux) + + list(APPEND all_targets ${target}) + + install(TARGETS ${target} + ARCHIVE DESTINATION ${UFO_PLUGINDIR} + LIBRARY DESTINATION ${UFO_PLUGINDIR}) +endforeach() +#}}} +#{{{ Subdirectories +add_subdirectory(kernels) +#}}} diff --git a/src/DEVELOPMENT b/src/DEVELOPMENT new file mode 100644 index 0000000..f3381fb --- /dev/null +++ b/src/DEVELOPMENT @@ -0,0 +1,26 @@ +Architecture +=========== + - Current implementation follows UFO architecture: reader and dataset-builder are split in two filters. + * The reader is multi-threaded. However, only a single instance of the builder is possible to schedule. + This could limit maximum throughput on dual-head or even signle-head, but many-core systems. + * Another problem here is timing. All events in the builder are initiaded from the reader. Consequently, + as it seems we can't timeout on semi-complete dataset if no new data is arriving. + * Besides, performance this is also critical for stability. With continuous streaming there is no problem, + however, if a finite number of frames requested and some packets are lost, the software will wait forever + for missing bits. + + +Questions +========= + - Can we pre-allocate several UFO buffers for forth-comming events. Currently, we need to buffer out-of-order + packets and copy them later (or buffer everything for simplicity). We can avoid this data copy if we can get + at least one packet in advance. + + - How I can execute 'generate' method on 'reductor' filter if no new data on the input for the specified + amount of time. One option is sending empty buffer with metadata indicating timeout. But this is again + hackish. + + - Can we use 16-bit buffers? I can set dimmensions to 1/4 of the correct value to address this. But is it + possible to do in a clean way? + + - What is 'ufotools' python package mentioned in documentation? Just a typo? diff --git a/src/kernels/CMakeLists.txt b/src/kernels/CMakeLists.txt new file mode 100644 index 0000000..e7df764 --- /dev/null +++ b/src/kernels/CMakeLists.txt @@ -0,0 +1,8 @@ +cmake_minimum_required(VERSION 2.6) + +# copy kernels +file(GLOB ufofilter_KERNELS "*.cl") + +foreach(_kernel ${ufofilter_KERNELS}) + install(FILES ${_kernel} DESTINATION ${UFO_KERNELDIR}) +endforeach() diff --git a/src/kernels/meson.build b/src/kernels/meson.build new file mode 100644 index 0000000..9803fb9 --- /dev/null +++ b/src/kernels/meson.build @@ -0,0 +1,7 @@ +kernel_files = [ +] + +install_data(kernel_files, + install_dir: kernel_install_dir, +) + diff --git a/src/meson.build b/src/meson.build new file mode 100644 index 0000000..8f3529e --- /dev/null +++ b/src/meson.build @@ -0,0 +1,45 @@ +plugins = [ + 'roof-read', + 'roof-build', +] + +roof_common_src = [ + 'ufo-roof-config.c', +] + +roof_plugin_src = { + 'roof-read': [ + 'ufo-roof-read-socket.c', + 'ufo-roof-read-file.c', + ], + 'roof-build': [ + 'ufo-roof-buffer.c', + ], +} + +# standard plugins + +foreach plugin: plugins + name = ''.join(plugin.split('-')) + + sources = roof_common_src + [ + 'ufo-@0@-task.c'.format(plugin), + ] + + if plugin in roof_plugin_src + sources += roof_plugin_src[plugin] + endif + + shared_module(name, + 'ufo-@0@-task.c'.format(plugin), + dependencies: deps, + name_prefix: 'libufofilter', + install: true, + install_dir: plugin_install_dir, + sources: sources + ) +endforeach + + + +subdir('kernels') diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c new file mode 100644 index 0000000..f071481 --- /dev/null +++ b/src/ufo-roof-buffer.c @@ -0,0 +1,122 @@ +#include <stdio.h> +#include <stdint.h> + +#include "glib.h" + +#include "ufo-roof.h" +#include "ufo-roof-buffer.h" + +// This is currently not thread safe. With dual-filter architecture this will be called sequentially. + +UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) { + UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer)); + if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer"); + + buffer->ring_size = cfg->buffer_size; + buffer->fragment_size = cfg->payload_size; + buffer->dataset_size = cfg->dataset_size; + buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size; + buffer->fragments_per_stream = buffer->fragments_per_dataset / cfg->n_streams; +// printf("Configuration: dataset: %u - %u fragments (%u streams x %u) x %u bytes\n", buffer->dataset_size, buffer->fragments_per_dataset, cfg->n_streams, buffer->fragments_per_stream, buffer->fragment_size); + + buffer->ring_buffer = malloc(buffer->ring_size * buffer->dataset_size); + buffer->n_fragments = (_Atomic int*)calloc(buffer->ring_size, sizeof(_Atomic int)); + buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint)); + + if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) { + ufo_roof_buffer_free(buffer); + roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size); + } + + return buffer; +} + +void ufo_roof_buffer_free(UfoRoofBuffer *buffer) { + if (buffer) { + if (buffer->ring_buffer) + free(buffer->ring_buffer); + if (buffer->n_fragments) + free(buffer->n_fragments); + if (buffer->stream_fragment) + free(buffer->stream_fragment); + + free(buffer); + } +} + + // fragment_id is numbered from 1 (0 - means auto) +gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) { + guint buffer_id; + guint dataset_id; + + if (!fragment_id) { + fragment_id = ++buffer->stream_fragment[stream_id]; + } + + // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required) + dataset_id = (fragment_id - 1) / buffer->fragments_per_stream; + fragment_id = (fragment_id - 1) % buffer->fragments_per_stream; + buffer_id = dataset_id % buffer->ring_size; + + // Late arrived packed +// printf("data set: %i, channel: %i, fragment: %i (buffer: %i)\n", dataset_id, stream_id, fragment_id, buffer_id); + if (dataset_id < buffer->current_id) + roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id); + + // We are not fast enough, new packets are arrvining to fast + if (dataset_id >= (buffer->current_id + buffer->ring_size)) { + // FIXME: Broken packets sanity checks? Allocate additional buffers on demand? + + if (error) + root_set_network_error(error, "Ring buffer exhausted. Dropping datasets from %i to %i, current dataset has %i parts of %i completed", + buffer->current_id, dataset_id - buffer->ring_size, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); + + // FIXME: Send semi-complete buffers further? + // FIXME: Or shall we drop more if larger buffers are allocated? + for (int i = buffer->current_id; i <= (dataset_id - buffer->ring_size); i++) + buffer->n_fragments[i%buffer->ring_size] = 0; + buffer->current_id = dataset_id - buffer->ring_size + 1; + + // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset. + // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value + // The updates may happen after writting/reading is finished. + } + + // FIXME: This is builds events as it read from file in roof v.1 code. We can assemble fan projections directly here. + void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size; + void *fragment_buffer = dataset_buffer + (stream_id * buffer->fragments_per_stream + fragment_id) * buffer->fragment_size; + +/* printf("buffer: %u (%u), packet: %u (%ux%u %u), packet_size: %u [%x]\n", + buffer_id, dataset_id, stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, buffer->fragment_size, + ((uint32_t*)fragment)[0] + );*/ + memcpy(fragment_buffer, fragment, buffer->fragment_size); + + // FIXME: Sanity checks: verify is not a dublicate fragment? + atomic_fetch_add(&buffer->n_fragments[buffer_id], 1); + + if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) { + // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? + if (dataset_id == buffer->current_id) { + return TRUE; + } + } + + return FALSE; +} + + + +gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, GError **error) { + guint buffer_id = buffer->current_id % buffer->ring_size; + void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size; + + // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? + if (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) return FALSE; + + memcpy(output_buffer, dataset_buffer, buffer->dataset_size); + buffer->n_fragments[buffer_id] = 0; + buffer->current_id += 1; + + return TRUE; +} diff --git a/src/ufo-roof-buffer.h b/src/ufo-roof-buffer.h new file mode 100644 index 0000000..bb71791 --- /dev/null +++ b/src/ufo-roof-buffer.h @@ -0,0 +1,31 @@ +#ifndef __UFO_ROOF_BUFFER_H +#define __UFO_ROOF_BUUFER_H + +#include <stdatomic.h> + +struct _UfoRoofBuffer { + guint current_id; // The ID of the first (active) dataset in the buffer + + guint ring_size; // Number of datasets to buffer + void *ring_buffer; // The ring buffer + _Atomic int *n_fragments; // Number of completed fragments in each buffer + guint *stream_fragment; // Currently processed fragment in the stream (for ordered streams) +// int *fragments; // Mark individual completed fragments (if we care for partial data) + + + guint dataset_size; // Size (in bytes) of a full dataset + guint fragment_size; // Size (in bytes) of a single fragment (we expect fixed-size fragments at the moment) + + guint fragments_per_dataset; // Number of packets in dataset (used to compute when dataset is ready) + guint fragments_per_stream; // Number of packets in each of data streams (used to compute when dataset is ready) +}; + +typedef struct _UfoRoofBuffer UfoRoofBuffer; + +UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error); +void ufo_roof_buffer_free(UfoRoofBuffer *buf); + +gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error); +gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, GError **error); + +#endif diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c new file mode 100644 index 0000000..81c84ce --- /dev/null +++ b/src/ufo-roof-build-task.c @@ -0,0 +1,325 @@ +/* + * Copyright (C) 2011-2015 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <stdio.h> + +#ifdef __APPLE__ +#include <OpenCL/cl.h> +#else +#include <CL/cl.h> +#endif + +#include "ufo-roof.h" +#include "ufo-roof-buffer.h" +#include "ufo-roof-build-task.h" + + +struct _UfoRoofBuildTaskPrivate { + gchar *config; // ROOF configuration file name + UfoRoofConfig *cfg; // Parsed ROOF parameters + UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet + + guint number; // Number of datasets to read + gboolean stop; // Stop flag +}; + +static void ufo_task_interface_init (UfoTaskIface *iface); + +G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE, + G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK, + ufo_task_interface_init)) + +#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate)) + +enum { + PROP_0, + PROP_STOP, + PROP_NUMBER, + PROP_CONFIG, + N_PROPERTIES +}; + +static GParamSpec *properties[N_PROPERTIES] = { NULL, }; + +UfoNode * +ufo_roof_build_task_new (void) +{ + return UFO_NODE (g_object_new (UFO_TYPE_ROOF_BUILD_TASK, NULL)); +} + +static void +ufo_roof_build_task_setup (UfoTask *task, + UfoResources *resources, + GError **error) +{ + GError *gerr = NULL; + + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + + if (!priv->config) + roof_setup_error(error, "ROOF configuration is not specified"); + + priv->cfg = ufo_roof_config_new(priv->config, &gerr); + if (!priv->cfg) + roof_propagate_error(error, gerr, "roof-build-setup: "); + + + priv->buf = ufo_roof_buffer_new(priv->cfg, &gerr); + if (!priv->buf) + roof_propagate_error(error, gerr, "roof-build-setup: "); +} + +static void +ufo_roof_build_task_finalize (GObject *object) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + if (priv->buf) { + ufo_roof_buffer_free(priv->buf); + priv->buf = NULL; + } + + if (priv->cfg) { + ufo_roof_config_free(priv->cfg); + priv->cfg = NULL; + } + + if (priv->config) { + g_free(priv->config); + priv->config = NULL; + } + + + G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object); +} + + + +static void +ufo_roof_build_task_get_requisition (UfoTask *task, + UfoBuffer **inputs, + UfoRequisition *requisition, + GError **error) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + + guint bytes = priv->cfg->dataset_size; + + // FIXME: Can this be made more elegant? + requisition->n_dims = 1; + requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0); + +} + +static guint +ufo_roof_build_task_get_num_inputs (UfoTask *task) +{ + return 1; +} + +static guint +ufo_roof_build_task_get_num_dimensions (UfoTask *task, + guint input) +{ + return 1; +} + +static UfoTaskMode +ufo_roof_build_task_get_mode (UfoTask *task) +{ + return UFO_TASK_MODE_CPU | UFO_TASK_MODE_REDUCTOR; +} + +static gboolean +ufo_roof_build_task_process (UfoTask *task, + UfoBuffer **inputs, + UfoBuffer *output, + UfoRequisition *requisition) +{ + GError *gerr = NULL; + + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + UfoRoofConfig *cfg = priv->cfg; + UfoRoofBuffer *buf = priv->buf; + gboolean ready = FALSE; + +// UfoRequisition in_req; +// ufo_buffer_get_requisition (inputs[0], &in_req); + + void *data = ufo_buffer_get_host_array(inputs[0], NULL); + UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(data, cfg); + + if (priv->stop) + return FALSE; + + for (int i = 0; i < header->n_packets; i++) { + int packet_id = 0; + + // Otherwise considered consecutive and handled by the buffer + if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { + UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(data); + packet_id = pheader->packet_id + 1; + } + + ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, data, &gerr); + if (gerr) roof_print_error(gerr); + + data += cfg->max_packet_size; + } + + // FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one? + +// printf("proc (%s) - channel: %i, packets: %i\n", ready?"yes":" no", header->channel_id, header->n_packets); + + return !ready; +} + +static gboolean +ufo_roof_build_task_generate (UfoTask *task, + UfoBuffer *output, + UfoRequisition *requisition) +{ + gboolean ready = FALSE; + GError *gerr = NULL; + + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); +// UfoRoofConfig *cfg = priv->cfg; + UfoRoofBuffer *buf = priv->buf; + + void *output_buffer = ufo_buffer_get_host_array(output, NULL); + + if (priv->stop) + return FALSE; + + ready = ufo_roof_buffer_get_dataset(buf, output_buffer, &gerr); + if (gerr) roof_print_error(gerr); + + if ((priv->number)&&(buf->current_id >= priv->number)) { + priv->stop = TRUE; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); + } + +// printf("gen(%s) %i\n", ready?"yes":" no", buf->current_id); + + return ready; +} + +static void +ufo_roof_build_task_set_property (GObject *object, + guint property_id, + const GValue *value, + GParamSpec *pspec) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_CONFIG: + if (priv->config) g_free(priv->config); + priv->config = g_value_dup_string(value); + break; + case PROP_STOP: + priv->stop = g_value_get_boolean (value); + break; + case PROP_NUMBER: + priv->number = g_value_get_uint (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_roof_build_task_get_property (GObject *object, + guint property_id, + GValue *value, + GParamSpec *pspec) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_CONFIG: + g_value_set_string(value, priv->config); + break; + case PROP_STOP: + g_value_set_boolean (value, priv->stop); + break; + case PROP_NUMBER: + g_value_set_uint (value, priv->number); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_task_interface_init (UfoTaskIface *iface) +{ + iface->setup = ufo_roof_build_task_setup; + iface->get_num_inputs = ufo_roof_build_task_get_num_inputs; + iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions; + iface->get_mode = ufo_roof_build_task_get_mode; + iface->get_requisition = ufo_roof_build_task_get_requisition; + iface->process = ufo_roof_build_task_process; + iface->generate = ufo_roof_build_task_generate; +} + +static void +ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) +{ + GObjectClass *oclass = G_OBJECT_CLASS (klass); + + oclass->set_property = ufo_roof_build_task_set_property; + oclass->get_property = ufo_roof_build_task_get_property; + oclass->finalize = ufo_roof_build_task_finalize; + + properties[PROP_CONFIG] = + g_param_spec_string ("config", + "ROOF configuration", + "Path to ROOF configuration file", + "", + G_PARAM_READWRITE); + + properties[PROP_STOP] = + g_param_spec_boolean ("stop", + "Stop flag", + "Stop socket servers and terminates filter execution", + FALSE, + G_PARAM_READWRITE); + + properties[PROP_NUMBER] = + g_param_spec_uint("number", + "Number of datasets to receive", + "Number of datasets to receive", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE); + + + for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++) + g_object_class_install_property (oclass, i, properties[i]); + + g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate)); +} + +static void +ufo_roof_build_task_init(UfoRoofBuildTask *self) +{ + self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self); +} diff --git a/src/ufo-roof-build-task.h b/src/ufo-roof-build-task.h new file mode 100644 index 0000000..cefb7ab --- /dev/null +++ b/src/ufo-roof-build-task.h @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2011-2013 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __UFO_ROOF_BUILD_TASK_H +#define __UFO_ROOF_BUILD_TASK_H + +#include <ufo/ufo.h> + +G_BEGIN_DECLS + +#define UFO_TYPE_ROOF_BUILD_TASK (ufo_roof_build_task_get_type()) +#define UFO_ROOF_BUILD_TASK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTask)) +#define UFO_IS_ROOF_BUILD_TASK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), UFO_TYPE_ROOF_BUILD_TASK)) +#define UFO_ROOF_BUILD_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskClass)) +#define UFO_IS_ROOF_BUILD_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UFO_TYPE_ROOF_BUILD_TASK)) +#define UFO_ROOF_BUILD_TASK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskClass)) + +typedef struct _UfoRoofBuildTask UfoRoofBuildTask; +typedef struct _UfoRoofBuildTaskClass UfoRoofBuildTaskClass; +typedef struct _UfoRoofBuildTaskPrivate UfoRoofBuildTaskPrivate; + +struct _UfoRoofBuildTask { + UfoTaskNode parent_instance; + + UfoRoofBuildTaskPrivate *priv; +}; + +struct _UfoRoofBuildTaskClass { + UfoTaskNodeClass parent_class; +}; + +UfoNode *ufo_roof_build_task_new (void); +GType ufo_roof_build_task_get_type (void); + +G_END_DECLS + +#endif diff --git a/src/ufo-roof-config.c b/src/ufo-roof-config.c new file mode 100644 index 0000000..11f8bd4 --- /dev/null +++ b/src/ufo-roof-config.c @@ -0,0 +1,172 @@ +#include <stdio.h> +#include <stdint.h> + +#include <json-glib/json-glib.h> + +#include <ufo/ufo.h> + +#include "ufo-roof-error.h" +#include "ufo-roof-config.h" + +#define roof_config_node_get_with_default(var, parent, type, name, default) do { \ + JsonNode *node = json_object_get_member(parent, name); \ + if (node) var = json_node_get_##type(node); \ + else var = default; \ + } while(0) + +#define roof_config_node_get_string_with_default(var, parent, name, default) do { \ + const gchar *str; \ + JsonNode *node = json_object_get_member(parent, name); \ + if (node) str = json_node_get_string(node); \ + else str = default; \ + if (var != str) { \ + if (var) g_free(var); \ + var = g_strdup(str); \ + } \ + } while(0) + +#define roof_config_node_get(var, parent, type, name) \ + roof_config_node_get_with_default(var, parent, type, name, var) + +#define roof_config_node_get_string(var, parent, name) \ + roof_config_node_get_string_with_default(var, parent, name, var) + + +typedef struct { + UfoRoofConfig cfg; + + JsonParser *parser; +} UfoRoofConfigPrivate; + +void ufo_roof_config_free(UfoRoofConfig *cfg) { + if (cfg) { + UfoRoofConfigPrivate *priv = (UfoRoofConfigPrivate*)cfg; + + if (cfg->path) + g_free(cfg->path); + + if (priv->parser) + g_object_unref (priv->parser); + + free(cfg); + } +} + +UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) { + UfoRoofConfigPrivate *priv; + UfoRoofConfig *cfg; + +// JsonNode *node; + JsonObject *root = NULL; + JsonObject *hardware = NULL; + JsonObject *network = NULL; + JsonObject *performance = NULL; + JsonObject *simulation = NULL; + + GError *gerr = NULL; + + priv = (UfoRoofConfigPrivate*)malloc(sizeof(UfoRoofConfigPrivate)); + if (!priv) roof_new_error(error, "Can't allocate UfoRoofConfig"); + + memset(priv, 0, sizeof(UfoRoofConfigPrivate)); + + // Set defaults + cfg = &priv->cfg; + + cfg->port = 4000; + cfg->n_streams = 1; + cfg->protocol = "udp"; + cfg->network_timeout = 10000000; + cfg->header_size = 0; + cfg->payload_size = 0; + cfg->max_packet_size = 0; + cfg->max_packets = 100; + cfg->dataset_size = 0; + cfg->buffer_size = 2; + cfg->path = NULL; + + // Read configuration + priv->parser = json_parser_new_immutable (); + json_parser_load_from_file (priv->parser, config, &gerr); + + if (gerr != NULL) { + g_propagate_prefixed_error(error, gerr, "Error parsing JSON file (%s) with ROOF configuration: ", config); + ufo_roof_config_free(cfg); + return NULL; + } + + root = json_node_get_object (json_parser_get_root (priv->parser)); + + if (root) { + roof_config_node_get(hardware, root, object, "hardware"); + roof_config_node_get(network, root, object, "network"); + roof_config_node_get(simulation, root, object, "simulation"); + } + + if (hardware) { + // FIXME: Compute dataset size based on roof hardware + } + + if (network) { +// int max_packet_size = 0; + + roof_config_node_get(cfg->port, network, int, "port"); + roof_config_node_get(cfg->n_streams, network, int, "streams"); + + roof_config_node_get(cfg->max_packet_size, network, int, "max_packet_size"); + // FIXME: compute payload_size based on sample_size + roof_config_node_get(cfg->payload_size, network, int, "payload_size"); + roof_config_node_get(cfg->header_size, network, int, "header_size"); + roof_config_node_get(cfg->dataset_size, network, int, "dataset_size"); + } + + if (performance) { + roof_config_node_get(cfg->max_packets, performance, int, "packets_at_once"); + roof_config_node_get(cfg->buffer_size, performance, int, "buffer_size"); + } + + if (simulation) { + roof_config_node_get_string(cfg->path, simulation, "path"); + roof_config_node_get(cfg->first_file_number, simulation, int, "first_file_number"); + } + + // Check configuration consistency + if (!cfg->payload_size) { + ufo_roof_config_free(cfg); + roof_new_error(error, "Packet size is not set"); + } + + if ((!cfg->header_size)&&(!cfg->path)) { + if (!strncmp(cfg->protocol, "udp", 3)) { + // Error if 0 implicitely set, use default value otherwise + if ((network)&&(json_object_get_member(network, "header_size"))) { + ufo_roof_config_free(cfg); + roof_new_error(error, "The header with packet ids is required for un-ordered protocols"); + } else { + cfg->header_size = sizeof(uint32_t); + } + } + } + + guint fragments_per_dataset = cfg->dataset_size / cfg->payload_size; + guint fragments_per_stream = fragments_per_dataset / cfg->n_streams; + + if ((cfg->dataset_size % cfg->payload_size)||(fragments_per_dataset%cfg->n_streams)) { + ufo_roof_config_free(cfg); + roof_new_error(error, "Inconsistent ROOF configuration: dataset_size=%u, packet_size=%u, data_streams=%u", cfg->dataset_size, cfg->payload_size, cfg->n_streams); + } + + if (cfg->buffer_size * fragments_per_stream < cfg->max_packets) { + cfg->max_packets = cfg->buffer_size * fragments_per_stream / 2; + } + + // Finalize configuration + if (!cfg->max_packet_size) + cfg->max_packet_size = cfg->header_size + cfg->payload_size; + + if (!cfg->dataset_size) + cfg->dataset_size = cfg->payload_size; + + + return cfg; +} diff --git a/src/ufo-roof-config.h b/src/ufo-roof-config.h new file mode 100644 index 0000000..a22c84f --- /dev/null +++ b/src/ufo-roof-config.h @@ -0,0 +1,37 @@ +#ifndef __UFO_ROOF_CONFIG_H +#define __UFO_ROOF_CONFIG_H + +#include <glib.h> + +typedef struct { + gchar *path; // Location of data files for simmulation purposes (i.e. reading a sequence of files instead listening on the corresponding ports) + guint first_file_number; // Indicates if the numbering of files starts at 0 or 1 + gchar *protocol; // Protocols: tcp, udp, tcp6, udp6, ... + guint port; // First port + guint n_streams; // Number of independent data streams (expected on sequential ports) + guint header_size; // Expected size of the packet header, for dgram protocols we need at least 32-bit sequence number. Defaults to uint32_t for udp* and 0 - otherwise + guint payload_size; // Expected size of TCP/UDP packet (without header) + guint dataset_size; // Size of a single dataset (image, sinogram, etc.). This is real size in bytes, excluding all technical headers used in communication protocol. + +//? + + +/* + guint pixels_per_module; + guint planes_per_module; + guint samples_per_dataset; +*/ + + guint max_packets; // limits maximum number of packets which are read at once + guint max_packet_size; // payload_size + header_size + ...? + guint buffer_size; // How many datasets we can buffer. There is no sense to have more than 2 for odered protocols (default), but having larger number could help for UDP if significant order disturbances are expected + guint network_timeout; // Maximum time (us) to wait for data on the socket + + + +} UfoRoofConfig; + +UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error); +void ufo_roof_config_free(UfoRoofConfig *cfg); + +#endif /* __UFO_ROOF_CONFIG_H */ diff --git a/src/ufo-roof-error.h b/src/ufo-roof-error.h new file mode 100644 index 0000000..ed0ae2b --- /dev/null +++ b/src/ufo-roof-error.h @@ -0,0 +1,58 @@ +#ifndef __UFO_ROOF_ERROR_H +#define __UFO_ROOF_ERROR_H + +#include <ufo/ufo.h> + + +#define roof_print_error(error) do { \ + g_warning("%s", error->message); \ + g_error_free(error); \ + error = NULL; \ + } while (0) + +#define roof_set_error(error, type, msg...) do { \ + if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, msg); \ + } while (0) + +#define roof_error(error, type, msg...) do { \ + if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, msg); \ + return; \ + } while (0) + +#define roof_propagate_error(error, err, msg...) do { \ + g_propagate_prefixed_error(error, err, msg); \ + return; \ + } while (0) + +#define roof_error_with_retval(error, retval, type, msg...) do { \ + if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, msg); \ + return retval; \ + } while (0) + +#define roof_propagate_error_with_retval(error, retval, err, msg...) do { \ + g_propagate_prefixed_error(error, err, msg); \ + return retval; \ + } while (0) + + +#define roof_setup_error(error, msg...) \ + roof_error(error, SETUP, msg) + +#define roof_new_error(error, msg...) \ + roof_error_with_retval(error, NULL, SETUP, msg) + + +#define roof_network_error(error, msg...) \ + roof_error(error, SETUP, msg) + +#define root_set_network_error(error, msg...) \ + roof_set_error(error, SETUP, msg) + +#define roof_network_error_with_retval(error, retval, msg...) \ + roof_error_with_retval(error, retval, SETUP, msg) + +#define roof_memory_error(error, msg...) \ + roof_error(error, SETUP, msg) + + +#endif /* __UFO_ROOF_ERROR_H */ diff --git a/src/ufo-roof-read-file.c b/src/ufo-roof-read-file.c new file mode 100644 index 0000000..de8391e --- /dev/null +++ b/src/ufo-roof-read-file.c @@ -0,0 +1,84 @@ +#include <stdio.h> +#include <errno.h> +#include <stdint.h> + +#include "glib.h" + +#include "ufo-roof.h" +#include "ufo-roof-read-file.h" + +typedef struct { + UfoRoofReadInterface iface; + + UfoRoofConfig *cfg; + + gchar *fname; + FILE *fd; +} UfoRoofReadFile; + +static void ufo_roof_read_file_free(UfoRoofReadInterface *iface) { + UfoRoofReadFile *reader = (UfoRoofReadFile*)iface; + + if (reader) { + if (reader->fname) + g_free(reader->fname); + + if (reader->fd >= 0) + fclose(reader->fd); + + free(reader); + } +} + +static guint ufo_roof_read_file(UfoRoofReadInterface *iface, void *buffers, GError **error) { + UfoRoofReadFile *reader = (UfoRoofReadFile*)iface; + UfoRoofConfig *cfg = reader->cfg; + + size_t bytes = 0; + size_t packet_size = cfg->header_size + cfg->payload_size; + size_t expected = cfg->max_packets * packet_size; + + while ((!feof(reader->fd))&&(!ferror(reader->fd))&&(bytes < expected)) { + size_t ret = fread(buffers + bytes, 1, expected - bytes, reader->fd); + bytes += ret; + } + + guint packets = bytes / packet_size; + + if (ferror(reader->fd)) { + roof_network_error_with_retval(error, 0, "read failed, error %i", ferror(reader->fd)); + } else if ((feof(reader->fd))&&(bytes % packet_size)) { + roof_network_error_with_retval(error, packets, "extra data in the end of input"); + } + + return packets; +} + + +UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, guint id, GError **error) { + UfoRoofReadFile *reader = (UfoRoofReadFile*)calloc(1, sizeof(UfoRoofReadFile)); + if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadFile"); + + // FIXME: Shall we jump if max_packet_size > header+payload (or will be extra data included in the data files)? Report error for now. + if ((cfg->header_size + cfg->payload_size) != cfg->max_packet_size) + roof_new_error(error, "packet_size (%u) should be equal to max_packet_size (%u) if UfoRoofReadFile is used", cfg->header_size + cfg->payload_size, cfg->max_packet_size); + + reader->cfg = cfg; + reader->iface.close = ufo_roof_read_file_free; + reader->iface.read =ufo_roof_read_file; + + reader->fname = g_strdup_printf(cfg->path, id + cfg->first_file_number); + if (!reader->fname) { + free(reader); + roof_new_error(error, "Can't build file name"); + } + + reader->fd = fopen(reader->fname, "rb"); + if (!reader->fd) { + g_free(reader->fname); + g_free(reader); + roof_new_error(error, "Can't open file %s", reader->fname); + } + + return (UfoRoofReadInterface*)reader; +} diff --git a/src/ufo-roof-read-file.h b/src/ufo-roof-read-file.h new file mode 100644 index 0000000..54bcf49 --- /dev/null +++ b/src/ufo-roof-read-file.h @@ -0,0 +1,8 @@ +#ifndef __UFO_ROOF_READ_FILE_H +#define __UFO_ROOF_READ_FILE_H + +#include "ufo-roof-read.h" + +UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, guint id, GError **error); + +#endif diff --git a/src/ufo-roof-read-socket.c b/src/ufo-roof-read-socket.c new file mode 100644 index 0000000..b72e9d0 --- /dev/null +++ b/src/ufo-roof-read-socket.c @@ -0,0 +1,118 @@ +#define _GNU_SOURCE + +#include <stdio.h> +#include <unistd.h> +#include <errno.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netdb.h> + +#include "glib.h" + +#include "ufo-roof.h" +#include "ufo-roof-read-socket.h" + +typedef struct { + UfoRoofReadInterface iface; + + UfoRoofConfig *cfg; + int socket; +} UfoRoofReadSocket; + +static void ufo_roof_read_socket_free(UfoRoofReadInterface *iface) { + UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface; + + if (reader) { + if (reader->socket >= 0) + close(reader->socket); + free(reader); + } +} + +static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, void *buf, GError **error) { + struct timespec timeout_ts; + + UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface; + UfoRoofConfig *cfg = reader->cfg; + + struct mmsghdr msg[cfg->max_packets]; + struct iovec msgvec[cfg->max_packets]; + + timeout_ts.tv_sec = cfg->network_timeout / 1000000; + timeout_ts.tv_nsec = 1000 * (cfg->network_timeout % 1000000); + + // FIXME: Is it optimal? Auto-tune max_packets? Combine read & build? + memset(msg, 0, sizeof(msg)); + memset(msgvec, 0, sizeof(msgvec)); + for (int i = 0; i < cfg->max_packets; i++) { + msgvec[i].iov_base = buf + i * cfg->max_packet_size; + msgvec[i].iov_len = cfg->max_packet_size; + msg[i].msg_hdr.msg_iov = &msgvec[i]; + msg[i].msg_hdr.msg_iovlen = 1; + } + + int packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts); + if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno); + + // FIXME: Shall we verify packets consistency here? We can check at least the sizes... + + return (guint)packets; +} + + +UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error) { + int err; + int port = cfg->port + id; + char port_str[16]; + const char *addr_str = "0.0.0.0"; + struct addrinfo sockaddr_hints; + struct addrinfo *sockaddr_info; + + UfoRoofReadSocket *reader = (UfoRoofReadSocket*)calloc(1, sizeof(UfoRoofReadSocket)); + if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadSocket"); + + reader->cfg = cfg; + reader->iface.close = ufo_roof_read_socket_free; + reader->iface.read =ufo_roof_read_socket; + + snprintf(port_str, sizeof(port_str), "%d", port); + port_str[sizeof(port_str) / sizeof(port_str[0]) - 1] = '\0'; + + memset(&sockaddr_hints, 0, sizeof(sockaddr_hints)); + if (!strncmp(cfg->protocol, "udp", 3)) { + sockaddr_hints.ai_family = AF_UNSPEC; + sockaddr_hints.ai_socktype = SOCK_DGRAM; + sockaddr_hints.ai_protocol = IPPROTO_UDP; + } else if (!strncmp(cfg->protocol, "tcp", 3)) { + sockaddr_hints.ai_family = AF_UNSPEC; + sockaddr_hints.ai_socktype = SOCK_STREAM; + sockaddr_hints.ai_protocol = IPPROTO_TCP; + } else { + roof_new_error(error, "Unsupported protocol (%s)", cfg->protocol); + } + + err = getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info); + if (err || !sockaddr_info) { + free(reader); + roof_new_error(error, "Invalid address (%s) or port (%s)", addr_str, port_str); + } + + reader->socket = socket(sockaddr_info->ai_family, sockaddr_info->ai_socktype | SOCK_CLOEXEC, sockaddr_info->ai_protocol); + if(reader->socket == -1) { + freeaddrinfo(sockaddr_info); + free(reader); + roof_new_error(error, "Can't create socket (%s) for address (%s) on port (%s)", cfg->protocol, addr_str, port_str); + } + + err = bind(reader->socket, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen); + if(err != 0) { + freeaddrinfo(sockaddr_info); + close(reader->socket); + free(reader); + roof_new_error(error, "Error (%i) binding socket (%s) for address (%s) on port (%s)", err, cfg->protocol, addr_str, port_str); + } + + freeaddrinfo(sockaddr_info); + + return (UfoRoofReadInterface*)reader; +} diff --git a/src/ufo-roof-read-socket.h b/src/ufo-roof-read-socket.h new file mode 100644 index 0000000..74b0742 --- /dev/null +++ b/src/ufo-roof-read-socket.h @@ -0,0 +1,8 @@ +#ifndef __UFO_ROOF_READ_SOCKET_H +#define __UFO_ROOF_READ_SOCKET_H + +#include "ufo-roof-read.h" + +UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error); + +#endif diff --git a/src/ufo-roof-read-task.c b/src/ufo-roof-read-task.c new file mode 100644 index 0000000..ebff9de --- /dev/null +++ b/src/ufo-roof-read-task.c @@ -0,0 +1,289 @@ +/* + * Copyright (C) 2011-2015 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#define _GNU_SOURCE + +#include <stdio.h> + +#ifdef __APPLE__ +#include <OpenCL/cl.h> +#else +#include <CL/cl.h> +#endif + +#include "ufo-roof.h" +#include "ufo-roof-read-socket.h" +#include "ufo-roof-read-file.h" +#include "ufo-roof-read-task.h" + +struct _UfoRoofReadTaskPrivate { + gchar *config; // ROOF configuration file name + UfoRoofConfig *cfg; // Parsed ROOF parameters + UfoRoofReadInterface *reader; + + guint id; // Reader ID (defince sequential port number) + gboolean stop; // Flag requiring termination +}; + +static void ufo_task_interface_init (UfoTaskIface *iface); + +G_DEFINE_TYPE_WITH_CODE (UfoRoofReadTask, ufo_roof_read_task, UFO_TYPE_TASK_NODE, + G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK, + ufo_task_interface_init)) + +#define UFO_ROOF_READ_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_READ_TASK, UfoRoofReadTaskPrivate)) + +enum { + PROP_0, + PROP_ID, + PROP_STOP, + PROP_CONFIG, + N_PROPERTIES +}; + +static GParamSpec *properties[N_PROPERTIES] = { NULL, }; + +UfoNode * +ufo_roof_read_task_new (void) +{ + return UFO_NODE (g_object_new (UFO_TYPE_ROOF_READ_TASK, NULL)); +} + +static void +ufo_roof_read_task_setup (UfoTask *task, + UfoResources *resources, + GError **error) +{ + GError *gerr = NULL; + + UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task); + + if (!priv->config) + roof_setup_error(error, "ROOF configuration is not specified"); + + priv->cfg = ufo_roof_config_new(priv->config, &gerr); + if (!priv->cfg) roof_propagate_error(error, gerr, "roof_config_new: "); + + // Consistency checks + if (priv->id >= priv->cfg->n_streams) + roof_setup_error(error, "Specified Stream ID is %u, but only %u data streams is configured", priv->id, priv->cfg->n_streams); + + // Start actual reader + if (priv->cfg->path) + priv->reader = ufo_roof_read_file_new(priv->cfg, priv->id, &gerr); + else + priv->reader = ufo_roof_read_socket_new(priv->cfg, priv->id, &gerr); + + if (!priv->reader) + roof_propagate_error(error, gerr, "roof_read_new: "); + +} + + +static void +ufo_roof_read_task_finalize (GObject *object) +{ + UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object); + + if (priv->reader) { + priv->reader->close(priv->reader); + } + + if (priv->cfg) { + ufo_roof_config_free(priv->cfg); + priv->cfg = NULL; + } + + if (priv->config) { + g_free(priv->config); + priv->config = NULL; + } + + G_OBJECT_CLASS (ufo_roof_read_task_parent_class)->finalize (object); +} + +static void +ufo_roof_read_task_get_requisition (UfoTask *task, + UfoBuffer **inputs, + UfoRequisition *requisition, + GError **error) +{ + UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task); + UfoRoofConfig *cfg = priv->cfg; + + guint bytes = cfg->max_packets * cfg->max_packet_size + sizeof(UfoRoofPacketBlockHeader); + + // FIXME: Can this be made more elegant? + requisition->n_dims = 1; + requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0); +} + +static guint +ufo_roof_read_task_get_num_inputs (UfoTask *task) +{ + return 0; +} + +static guint +ufo_roof_read_task_get_num_dimensions (UfoTask *task, + guint input) +{ + return 0; +} + +static UfoTaskMode +ufo_roof_read_task_get_mode (UfoTask *task) +{ + return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR; +} + + +static gboolean +ufo_roof_read_task_generate (UfoTask *task, + UfoBuffer *output, + UfoRequisition *requisition) +{ + GError *gerr = NULL; + UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task); + UfoRoofConfig *cfg = priv->cfg; + + void *output_buffer = ufo_buffer_get_host_array(output, NULL); + UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(output_buffer, cfg); + + if (priv->stop) + return FALSE; + + guint packets = priv->reader->read(priv->reader, output_buffer, &gerr); + if (gerr) { + g_warning("Error reciving data: %s", gerr->message); + return FALSE; + } + + // FIXME: End of data (shall we restart in the network case?) + if (!packets) + return FALSE; + + // Shall I use UFO metadata (ufo_buffer_set_metadata) insead? + header->channel_id = priv->id; + header->n_packets = packets; + + return TRUE; +} + +static void +ufo_roof_read_task_set_property (GObject *object, + guint property_id, + const GValue *value, + GParamSpec *pspec) +{ + UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_CONFIG: + if (priv->config) g_free(priv->config); + priv->config = g_value_dup_string(value); + break; + case PROP_ID: + priv->id = g_value_get_uint (value); + break; + case PROP_STOP: + priv->stop = g_value_get_boolean (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_roof_read_task_get_property (GObject *object, + guint property_id, + GValue *value, + GParamSpec *pspec) +{ + UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_CONFIG: + g_value_set_string(value, priv->config); + break; + case PROP_ID: + g_value_set_uint (value, priv->id); + break; + case PROP_STOP: + g_value_set_boolean (value, priv->stop); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_task_interface_init (UfoTaskIface *iface) +{ + iface->setup = ufo_roof_read_task_setup; + iface->get_num_inputs = ufo_roof_read_task_get_num_inputs; + iface->get_num_dimensions = ufo_roof_read_task_get_num_dimensions; + iface->get_mode = ufo_roof_read_task_get_mode; + iface->get_requisition = ufo_roof_read_task_get_requisition; + iface->generate = ufo_roof_read_task_generate; +} + +static void +ufo_roof_read_task_class_init (UfoRoofReadTaskClass *klass) +{ + GObjectClass *oclass = G_OBJECT_CLASS (klass); + + oclass->set_property = ufo_roof_read_task_set_property; + oclass->get_property = ufo_roof_read_task_get_property; + oclass->finalize = ufo_roof_read_task_finalize; + + properties[PROP_CONFIG] = + g_param_spec_string ("config", + "ROOF configuration", + "Path to ROOF configuration file", + "", + G_PARAM_READWRITE); + + properties[PROP_ID] = + g_param_spec_uint ("id", + "Reader ID", + "ID for multi-port servers", + 0, G_MAXUINT, 1, + G_PARAM_READWRITE); + + properties[PROP_STOP] = + g_param_spec_boolean ("stop", + "Stop flag", + "Stop socket servers and terminates filter execution", + FALSE, + G_PARAM_READWRITE); + + for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++) + g_object_class_install_property (oclass, i, properties[i]); + + g_type_class_add_private (oclass, sizeof(UfoRoofReadTaskPrivate)); +} + +static void +ufo_roof_read_task_init(UfoRoofReadTask *self) +{ + self->priv = UFO_ROOF_READ_TASK_GET_PRIVATE(self); +} diff --git a/src/ufo-roof-read-task.h b/src/ufo-roof-read-task.h new file mode 100644 index 0000000..2fe45c4 --- /dev/null +++ b/src/ufo-roof-read-task.h @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2011-2013 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __UFO_ROOF_READ_TASK_H +#define __UFO_ROOF_READ_TASK_H + +#include <ufo/ufo.h> + +G_BEGIN_DECLS + +#define UFO_TYPE_ROOF_READ_TASK (ufo_roof_read_task_get_type()) +#define UFO_ROOF_READ_TASK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), UFO_TYPE_ROOF_READ_TASK, UfoRoofReadTask)) +#define UFO_IS_ROOF_READ_TASK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), UFO_TYPE_ROOF_READ_TASK)) +#define UFO_ROOF_READ_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), UFO_TYPE_ROOF_READ_TASK, UfoRoofReadTaskClass)) +#define UFO_IS_ROOF_READ_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UFO_TYPE_ROOF_READ_TASK)) +#define UFO_ROOF_READ_TASK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), UFO_TYPE_ROOF_READ_TASK, UfoRoofReadTaskClass)) + +typedef struct _UfoRoofReadTask UfoRoofReadTask; +typedef struct _UfoRoofReadTaskClass UfoRoofReadTaskClass; +typedef struct _UfoRoofReadTaskPrivate UfoRoofReadTaskPrivate; + +struct _UfoRoofReadTask { + UfoTaskNode parent_instance; + + UfoRoofReadTaskPrivate *priv; +}; + +struct _UfoRoofReadTaskClass { + UfoTaskNodeClass parent_class; +}; + +UfoNode *ufo_roof_read_task_new (void); +GType ufo_roof_read_task_get_type (void); + +G_END_DECLS + +#endif diff --git a/src/ufo-roof-read.h b/src/ufo-roof-read.h new file mode 100644 index 0000000..50dbdf3 --- /dev/null +++ b/src/ufo-roof-read.h @@ -0,0 +1,17 @@ +#ifndef __UFO_ROOF_READ_H +#define __UFO_ROOF_READ_H + +#include "ufo-roof-config.h" + +typedef struct _UfoRoofReadInterface UfoRoofReadInterface; + +typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, void *buf, GError **error); +typedef void (*UfoRoofReaderClose)(UfoRoofReadInterface *reader); + +struct _UfoRoofReadInterface { + UfoRoofReaderRead read; + UfoRoofReaderClose close; +}; + + +#endif /* __UFO_ROOF_READ_H */ diff --git a/src/ufo-roof.h b/src/ufo-roof.h new file mode 100644 index 0000000..c3edda2 --- /dev/null +++ b/src/ufo-roof.h @@ -0,0 +1,20 @@ +#ifndef __UFO_ROOF_H +#define __UFO_ROOF_H + +#include "ufo-roof-config.h" +#include "ufo-roof-error.h" + +#define UFO_ROOF_PACKET_HEADER(buf) ((UfoRoofPacketHeader*)(buf)) +#define UFO_ROOF_PACKET_BLOCK_HEADER(buf, cfg) ((UfoRoofPacketBlockHeader*)(((void*)buf) + cfg->max_packets * cfg->max_packet_size)) + +typedef struct { + uint32_t packet_id; // Sequential Packet ID (numbered from 0) +} UfoRoofPacketHeader; + +typedef struct { + uint32_t channel_id; // Specifies channel on which the data were received (numbered from 0) + uint32_t n_packets; // Number of packets +} UfoRoofPacketBlockHeader; + + +#endif /* __UFO_ROOF_H */ diff --git a/tests/roof.json b/tests/roof.json new file mode 100644 index 0000000..0dcd1e7 --- /dev/null +++ b/tests/roof.json @@ -0,0 +1,31 @@ +{ + "network": { + "protocol": "udp", + "port": 4000, + "streams": 16, + "payload_size": 1280, + "dataset_size": 1024000 + }, + "performance": { + "buffer_size": 2, + "packets_at_once": 100 + }, + "simulation": { + "first_file_number": 1, + "path": "/mnt/fast/ROOF2/roof2-data.pumpe256/meas/data_pumpe_dyn_192.168.100_%02u.dat" + }, + "setup": { + "planes": 2, + "modules": 16, + "bit_depth": 16, + "pixels_per_module": 16, + "samples_per_rotation": 2000000 + }, + "geometry": { + "fan_projections": 1000, + "fan_detectors": 432, + "parallel_projections": 512, + "parallel_detectors": 256, + "detector_diameter": 216 + } +} diff --git a/tests/roof.py b/tests/roof.py new file mode 100644 index 0000000..2138931 --- /dev/null +++ b/tests/roof.py @@ -0,0 +1,46 @@ +import gi +import sys +import json +import gobject + +from gi.repository import Ufo +from gi.repository import GObject + +class RoofConfig: + def __init__(self, config="roof.json"): + self.streams = 1 + + with open(config) as json_file: + cfg = json.load(json_file) + if cfg.get("network", {}).get("streams") != None: + self.streams = cfg["network"]["streams"] + elif cfg.get("setup", {}).get("modules") != None: + self.streams = cfg["setup"]["modules"] + +config = "roof.json" +cfg = RoofConfig(config) + +pm = Ufo.PluginManager() +graph = Ufo.TaskGraph() +scheduler = Ufo.Scheduler() + + +build = pm.get_task('roof-build') +build.set_properties(config=config, number=0) + +write = pm.get_task('write') +write.set_properties(filename="test.raw") + +for id in range(cfg.streams): + read = pm.get_task('roof-read') + read.set_properties(config=config, id=id) + graph.connect_nodes(read, build) + build.bind_property('stop', read, 'stop', GObject.BindingFlags.DEFAULT) + +#read_task.set_properties(path='/home/data/*.tif', start=10, number=100) +#graph.connect_nodes_full(read, write, 0) +graph.connect_nodes(build, write) + + + +scheduler.run(graph) diff --git a/tests/roof.sh b/tests/roof.sh new file mode 100755 index 0000000..d0ec30a --- /dev/null +++ b/tests/roof.sh @@ -0,0 +1,3 @@ +cat roof.yaml | yq . > roof.json + +GI_TYPELIB_PATH="/usr/local/lib64/girepository-1.0/" python roof.py diff --git a/tests/roof.yaml b/tests/roof.yaml new file mode 100644 index 0000000..a49d53b --- /dev/null +++ b/tests/roof.yaml @@ -0,0 +1,26 @@ +network: + protocol: udp + port: 4000 + streams: 16 +# header_size: 0 + payload_size: 1280 +# max_packet_size: 1284 + dataset_size: 1024000 +performance: + buffer_size: 2 + packets_at_once: 100 +simulation: + first_file_number: 1 + path: "/mnt/fast/ROOF2/roof2-data.pumpe256/meas/data_pumpe_dyn_192.168.100_%02u.dat" +setup: + planes: 2 + modules: 16 + bit_depth: 16 + pixels_per_module: 16 + samples_per_rotation: 2000000 +geometry: + fan_projections: 1000 + fan_detectors: 432 + parallel_projections: 512 + parallel_detectors: 256 + detector_diameter: 216 diff --git a/tests/test_file.sh b/tests/test_file.sh new file mode 100644 index 0000000..5f47e45 --- /dev/null +++ b/tests/test_file.sh @@ -0,0 +1,11 @@ +#! /bin/bash + +packet_size=1280 +packets_per_dataset=50 + +for packet in $(seq 0 24); do + for id in $(seq 0 15); do + name=$(ls *$id.dat | grep -P "_0?$id.dat") + dd if=$name of="roof_test.raw" bs=$packet_size count=$packets_per_dataset skip=$((packet * $packets_per_dataset)) oflag=append conv=notrunc + done +done |