summaryrefslogtreecommitdiffstats
path: root/src/ufo-roof-read-socket.c
blob: 7bbe8efe13dd40d508a3265fd32c8d949a0b6377 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#define _GNU_SOURCE

#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>

#include "glib.h"

#include "ufo-roof.h"
#include "ufo-roof-read-socket.h"

typedef struct {
    UfoRoofReadInterface iface;

    UfoRoofConfig *cfg;
    int socket;
} UfoRoofReadSocket;

static void ufo_roof_read_socket_free(UfoRoofReadInterface *iface) {
    UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface;
    
    if (reader) {
        if (reader->socket >= 0) 
            close(reader->socket);
        free(reader);
    }
}

static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GError **error) {
    struct timespec timeout_ts;

    UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface;
    UfoRoofConfig *cfg = reader->cfg;

    struct mmsghdr msg[cfg->max_packets];
    struct iovec msgvec[cfg->max_packets];

    timeout_ts.tv_sec = cfg->network_timeout / 1000000;
    timeout_ts.tv_nsec = 1000 * (cfg->network_timeout % 1000000);

        // FIXME: Is it optimal? Auto-tune max_packets? Combine read & build?
    memset(msg, 0, sizeof(msg));
    memset(msgvec, 0, sizeof(msgvec));
    for (guint i = 0; i < cfg->max_packets; i++) {
        msgvec[i].iov_base = buf + i * cfg->max_packet_size;
        msgvec[i].iov_len = cfg->max_packet_size;
        msg[i].msg_hdr.msg_iov = &msgvec[i];
        msg[i].msg_hdr.msg_iovlen = 1;
    }

	// Timeout seems broken, see BUGS in 'recvmmsg' bugs page
    int packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
    if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno);

        // FIXME: Shall we verify packets consistency here? We can check at least the sizes...

    return (guint)packets;
}


UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error) {
    int err;
    int port = cfg->port + id;
    char port_str[16];
    const char *addr_str = "0.0.0.0";
    struct addrinfo sockaddr_hints;
    struct addrinfo *sockaddr_info;

    UfoRoofReadSocket *reader = (UfoRoofReadSocket*)calloc(1, sizeof(UfoRoofReadSocket));
    if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadSocket");

    reader->cfg = cfg;
    reader->iface.close = ufo_roof_read_socket_free;
    reader->iface.read =ufo_roof_read_socket;
    
    snprintf(port_str, sizeof(port_str), "%d", port);
    port_str[sizeof(port_str) / sizeof(port_str[0]) - 1] = '\0';

    memset(&sockaddr_hints, 0, sizeof(sockaddr_hints));
    if (!strncmp(cfg->protocol, "udp", 3)) {
        sockaddr_hints.ai_family = AF_UNSPEC;
        sockaddr_hints.ai_socktype = SOCK_DGRAM;
        sockaddr_hints.ai_protocol = IPPROTO_UDP;
    } else if (!strncmp(cfg->protocol, "tcp", 3)) {
        sockaddr_hints.ai_family = AF_UNSPEC;
        sockaddr_hints.ai_socktype = SOCK_STREAM;
        sockaddr_hints.ai_protocol = IPPROTO_TCP;
    } else {
        roof_new_error(error, "Unsupported protocol (%s)", cfg->protocol);
    }

    err = getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info);
    if (err || !sockaddr_info) {
        free(reader);
        roof_new_error(error, "Invalid address (%s) or port (%s)", addr_str, port_str);
    }
    
    reader->socket = socket(sockaddr_info->ai_family, sockaddr_info->ai_socktype | SOCK_CLOEXEC, sockaddr_info->ai_protocol);
    if(reader->socket == -1) {
        freeaddrinfo(sockaddr_info);
        free(reader);
        roof_new_error(error, "Can't create socket (%s) for address (%s) on port (%s)", cfg->protocol, addr_str, port_str);
    }

    err = bind(reader->socket, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen);
    if(err != 0) {
        freeaddrinfo(sockaddr_info);
        close(reader->socket);
        free(reader);
        roof_new_error(error, "Error (%i) binding socket (%s) for address (%s) on port (%s)", err, cfg->protocol, addr_str, port_str);
    }

    freeaddrinfo(sockaddr_info);

    return (UfoRoofReadInterface*)reader;
}