summaryrefslogtreecommitdiffstats
path: root/src/roof-read-socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/roof-read-socket.c')
-rw-r--r--src/roof-read-socket.c159
1 files changed, 159 insertions, 0 deletions
diff --git a/src/roof-read-socket.c b/src/roof-read-socket.c
new file mode 100644
index 0000000..1b98b44
--- /dev/null
+++ b/src/roof-read-socket.c
@@ -0,0 +1,159 @@
+#define _GNU_SOURCE
+
+#include <stdio.h>
+#include <unistd.h>
+#include <assert.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+
+#include "glib.h"
+
+#include "ufo-roof.h"
+#include "ufo-roof-read-socket.h"
+
+typedef struct {
+ RoofReadInterface iface;
+
+ RoofConfig *cfg;
+
+ int socket;
+ struct mmsghdr *msg;
+ struct iovec *msgvec;
+ uint8_t *buf;
+
+ int libvma;
+} RoofReadSocket;
+
+static void roof_read_socket_free(RoofReadInterface *iface) {
+ RoofReadSocket *reader = (RoofReadSocket*)iface;
+
+ if (reader) {
+ if (reader->socket >= 0)
+ close(reader->socket);
+ if (reader->msgvec)
+ free(reader->msgvec);
+ if (reader->msg)
+ free(reader->msg);
+ if (reader->buf)
+ free(reader->buf);
+ free(reader);
+ }
+}
+
+static guint roof_read_socket(RoofReadInterface *iface, uint8_t **buf, GError **error) {
+ int packets;
+ struct timespec timeout_ts;
+
+ assert(iface);
+ assert(buf);
+
+ RoofReadSocket *reader = (RoofReadSocket*)iface;
+ RoofConfig *cfg = reader->cfg;
+
+ timeout_ts.tv_sec = cfg->network_timeout / 1000000;
+ timeout_ts.tv_nsec = 1000 * (cfg->network_timeout % 1000000);
+
+
+//retry:
+ // Timeout seems broken, see BUGS in 'recvmmsg' bugs page
+ packets = recvmmsg(reader->socket, reader->msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
+ if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno);
+
+/*
+ // FIXME: Shall we verify packets consistency here? We can check at least the sizes...
+ if ((packets == 1)&&(msg[0].msg_len < 16)) {
+ goto retry;
+ }
+*/
+
+ *buf = reader->buf;
+ return (guint)packets;
+}
+
+RoofReadInterface *roof_read_socket_new(RoofConfig *cfg, guint id, GError **error) {
+ int err;
+ int port = cfg->port + id;
+ char port_str[16];
+ const char *addr_str = "192.168.100.8";
+ struct addrinfo sockaddr_hints;
+ struct addrinfo *sockaddr_info;
+
+ RoofReadSocket *reader = (RoofReadSocket*)calloc(1, sizeof(RoofReadSocket));
+ if (!reader) roof_new_error(error, "Can't allocate RoofReadSocket");
+
+ reader->cfg = cfg;
+ reader->iface.close = roof_read_socket_free;
+ reader->iface.read =roof_read_socket;
+
+ snprintf(port_str, sizeof(port_str), "%d", port);
+ port_str[sizeof(port_str) / sizeof(port_str[0]) - 1] = '\0';
+
+ memset(&sockaddr_hints, 0, sizeof(sockaddr_hints));
+ if (!strncmp(cfg->protocol, "udp", 3)) {
+ sockaddr_hints.ai_family = AF_UNSPEC;
+ sockaddr_hints.ai_socktype = SOCK_DGRAM;
+ sockaddr_hints.ai_protocol = IPPROTO_UDP;
+ } else if (!strncmp(cfg->protocol, "tcp", 3)) {
+ sockaddr_hints.ai_family = AF_UNSPEC;
+ sockaddr_hints.ai_socktype = SOCK_STREAM;
+ sockaddr_hints.ai_protocol = IPPROTO_TCP;
+ } else {
+ roof_new_error(error, "Unsupported protocol (%s)", cfg->protocol);
+ }
+
+ err = getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info);
+ if (err || !sockaddr_info) {
+ free(reader);
+ roof_new_error(error, "Invalid address (%s) or port (%s)", addr_str, port_str);
+ }
+
+ reader->socket = socket(sockaddr_info->ai_family, sockaddr_info->ai_socktype | SOCK_CLOEXEC, sockaddr_info->ai_protocol);
+ if(reader->socket == -1) {
+ freeaddrinfo(sockaddr_info);
+ free(reader);
+ roof_new_error(error, "Can't create socket (%s) for address (%s) on port (%s)", cfg->protocol, addr_str, port_str);
+ }
+
+ err = bind(reader->socket, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen);
+ if(err != 0) {
+ freeaddrinfo(sockaddr_info);
+ close(reader->socket);
+ free(reader);
+ roof_new_error(error, "Error (%i) binding socket (%s) for address (%s) on port (%s)", err, cfg->protocol, addr_str, port_str);
+ }
+
+/*
+ // Check that FPGA module is ready
+ char msg[4];
+ addr_str = "192.168.34.83";
+ getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info);
+ sendto(reader->socket, msg, sizeof(msg), 0, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen);
+*/
+ freeaddrinfo(sockaddr_info);
+
+ if (reader->libvma) {
+ } else {
+ reader->buf = (uint8_t*)malloc(cfg->max_packets * cfg->max_packet_size);
+ reader->msg = (struct mmsghdr*)malloc(cfg->max_packets * sizeof(struct mmsghdr));
+ reader->msgvec = (struct iovec*)malloc(cfg->max_packets * sizeof(struct iovec));
+
+ if ((!reader->buf)||(!reader->msg)||(!reader->msgvec)) {
+ roof_read_socket_free((RoofReadInterface*)reader);
+ roof_new_error(error, "Can't allocate socket buffer");
+ }
+
+ memset(reader->msg, 0, cfg->max_packets * sizeof(struct mmsghdr));
+ memset(reader->msgvec, 0, cfg->max_packets * sizeof(struct iovec));
+ for (guint i = 0; i < cfg->max_packets; i++) {
+ reader->msgvec[i].iov_base = reader->buf + i * cfg->max_packet_size;
+ reader->msgvec[i].iov_len = cfg->max_packet_size;
+ reader->msg[i].msg_hdr.msg_iov = &reader->msgvec[i];
+ reader->msg[i].msg_hdr.msg_iovlen = 1;
+ }
+ }
+
+
+ return (RoofReadInterface*)reader;
+}