1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
#define _GNU_SOURCE
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include "glib.h"
#include "ufo-roof.h"
#include "ufo-roof-read-socket.h"
typedef struct {
UfoRoofReadInterface iface;
UfoRoofConfig *cfg;
int socket;
} UfoRoofReadSocket;
static void ufo_roof_read_socket_free(UfoRoofReadInterface *iface) {
UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface;
if (reader) {
if (reader->socket >= 0)
close(reader->socket);
free(reader);
}
}
static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GError **error) {
struct timespec timeout_ts;
UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface;
UfoRoofConfig *cfg = reader->cfg;
struct mmsghdr msg[cfg->max_packets];
struct iovec msgvec[cfg->max_packets];
timeout_ts.tv_sec = cfg->network_timeout / 1000000;
timeout_ts.tv_nsec = 1000 * (cfg->network_timeout % 1000000);
// FIXME: Is it optimal? Auto-tune max_packets? Combine read & build?
memset(msg, 0, sizeof(msg));
memset(msgvec, 0, sizeof(msgvec));
for (guint i = 0; i < cfg->max_packets; i++) {
msgvec[i].iov_base = buf + i * cfg->max_packet_size;
msgvec[i].iov_len = cfg->max_packet_size;
msg[i].msg_hdr.msg_iov = &msgvec[i];
msg[i].msg_hdr.msg_iovlen = 1;
}
// Timeout seems broken, see BUGS in 'recvmmsg' bugs page
int packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno);
// FIXME: Shall we verify packets consistency here? We can check at least the sizes...
return (guint)packets;
}
UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error) {
int err;
int port = cfg->port + id;
char port_str[16];
const char *addr_str = "0.0.0.0";
struct addrinfo sockaddr_hints;
struct addrinfo *sockaddr_info;
UfoRoofReadSocket *reader = (UfoRoofReadSocket*)calloc(1, sizeof(UfoRoofReadSocket));
if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadSocket");
reader->cfg = cfg;
reader->iface.close = ufo_roof_read_socket_free;
reader->iface.read =ufo_roof_read_socket;
snprintf(port_str, sizeof(port_str), "%d", port);
port_str[sizeof(port_str) / sizeof(port_str[0]) - 1] = '\0';
memset(&sockaddr_hints, 0, sizeof(sockaddr_hints));
if (!strncmp(cfg->protocol, "udp", 3)) {
sockaddr_hints.ai_family = AF_UNSPEC;
sockaddr_hints.ai_socktype = SOCK_DGRAM;
sockaddr_hints.ai_protocol = IPPROTO_UDP;
} else if (!strncmp(cfg->protocol, "tcp", 3)) {
sockaddr_hints.ai_family = AF_UNSPEC;
sockaddr_hints.ai_socktype = SOCK_STREAM;
sockaddr_hints.ai_protocol = IPPROTO_TCP;
} else {
roof_new_error(error, "Unsupported protocol (%s)", cfg->protocol);
}
err = getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info);
if (err || !sockaddr_info) {
free(reader);
roof_new_error(error, "Invalid address (%s) or port (%s)", addr_str, port_str);
}
reader->socket = socket(sockaddr_info->ai_family, sockaddr_info->ai_socktype | SOCK_CLOEXEC, sockaddr_info->ai_protocol);
if(reader->socket == -1) {
freeaddrinfo(sockaddr_info);
free(reader);
roof_new_error(error, "Can't create socket (%s) for address (%s) on port (%s)", cfg->protocol, addr_str, port_str);
}
err = bind(reader->socket, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen);
if(err != 0) {
freeaddrinfo(sockaddr_info);
close(reader->socket);
free(reader);
roof_new_error(error, "Error (%i) binding socket (%s) for address (%s) on port (%s)", err, cfg->protocol, addr_str, port_str);
}
freeaddrinfo(sockaddr_info);
return (UfoRoofReadInterface*)reader;
}
|