#include #include "roof.h" #include "roof-thread.h" RoofThreadContext *roof_thread_context_new(RoofConfig *cfg, Roof *roof, guint from, guint to, GError **error) { GError *gerr = NULL; RoofThreadContext *rdt = (RoofThreadContext*)calloc(1, sizeof(RoofThreadContext)); if (!rdt) roof_new_error(error, "Can't allocate RoofThreadContext"); rdt->cfg = cfg; rdt->rdi = rdi; return rdt; } void roof_thread_context_free(RoofThreadContext *rdt) { if (rdt) { free(rdt); } } int roof_thread_read_socket(Roof * int roof_thread_read(HWThread thr, void *hwctx, int id, void *data) { Roof *ctx = (Roof*)data; RoofConfig *cfg = ctx->cfg; RoofThreadContext *rdt = ctx->rdt[id]; guint dataset_dims[2] = { cfg->fan_bins * cfg->bit_depth / 8, cfg->fan_projections }; guint fragment_dims[2] = { cfg->channels_per_module * cfg->bit_depth / 8, ????cfg->payload_size / buffer->fragment_dims[0] }; packet_id fragment_id = for (guint stream_id = from; stream_id < to; stream_id++) { uint8_t *rdbuf; guint packets = priv->rdi[stream_id]->read(priv->rdi[stream_id], &rdbuf, &gerr); if (gerr) roof_print_error(gerr); for (guint i = 0; i < packets; i++) { guint64 packet_id = 0; // Otherwise considered consecutive and handled by the buffer if (cfg->header_size >= sizeof(RoofPacketHeader)) { RoofPacketHeader *pheader = ROOF_PACKET_HEADER(fragment); packet_id = be64toh(pheader->packet_id) + 1; } else { // FIXME: consider consecutive //fragment_id = ++buffer->stream_fragment[stream_id]; } // FIXME: packet may contain fragments for multiple datasets fragment_id = packet_id * fragments_per_packet; guint64 dataset_id = (packet_id - 1) / cfg->fragments_per_stream; guint64 fragment_id = (packet_id - 1) % cfg->fragments_per_stream; // FIXME: verify that packet is consecutive // if // Drop packets of already skipped datasets if (dataset_id < priv->current_id) continue; // FIXME: stop processing and return.... if (dataset_id < priv->current_id) uint8_t *fragment_buffer = dataset_buffer + stream_id * fragment_dims[0] + // x-coordinate (fragment_id * fragment_dims[1]) * dataset_dims[0]; // y-coordinate for (guint i = 0; i < buffer->fragment_dims[1]; ++i) { memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]); // buffer->last_id[stream_id] = dataset_id; ready |= roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr); if (gerr) roof_print_error(gerr); fragment += cfg->max_packet_size; } } }