git » nmdb » commit 4b89315

Implement UDP in the server.

author Alberto Bertogli
2007-06-07 05:15:08 UTC
committer Alberto Bertogli
2007-06-07 05:15:08 UTC
parent 2dc32f844d36921b0d9bd576e82e98cc07887e5e

Implement UDP in the server.

It was quite easy, because it's almost identical to TIPC.

Completely tested, patches for the C library and the tests will follow.

Signed-off-by: Alberto Bertogli <albertito@gmail.com>

nmdb/Makefile +10 -1
nmdb/common.h +2 -0
nmdb/main.c +15 -0
nmdb/net-const.h +4 -0
nmdb/net.c +19 -3
nmdb/req.h +1 -0
nmdb/udp-stub.c +18 -0
nmdb/udp.c +248 -0
nmdb/udp.h +10 -0

diff --git a/nmdb/Makefile b/nmdb/Makefile
index c58d289..5c58c5a 100644
--- a/nmdb/Makefile
+++ b/nmdb/Makefile
@@ -1,10 +1,13 @@
 
 ENABLE_TIPC = 1
 ENABLE_TCP = 1
+ENABLE_UDP = 1
 
 CFLAGS += -std=c99 -Wall -O3
 ALL_CFLAGS = -D_XOPEN_SOURCE=500 $(CFLAGS)
-ALL_CFLAGS += -DENABLE_TIPC=$(ENABLE_TIPC) -DENABLE_TCP=$(ENABLE_TCP)
+ALL_CFLAGS += -DENABLE_TIPC=$(ENABLE_TIPC) \
+		-DENABLE_TCP=$(ENABLE_TCP) \
+		-DENABLE_UDP=$(ENABLE_UDP)
 
 ifdef DEBUG
 ALL_CFLAGS += -g
@@ -36,6 +39,12 @@ else
 	OBJS += tcp-stub.o
 endif
 
+ifeq ($(ENABLE_UDP), 1)
+	OBJS += udp.o
+else
+	OBJS += udp-stub.o
+endif
+
 
 default: all
 
diff --git a/nmdb/common.h b/nmdb/common.h
index 31d1548..2d6cf56 100644
--- a/nmdb/common.h
+++ b/nmdb/common.h
@@ -18,6 +18,8 @@ struct settings {
 	int tipc_upper;
 	char *tcp_addr;
 	int tcp_port;
+	char *udp_addr;
+	int udp_port;
 	int numobjs;
 	int foreground;
 	int passive;
diff --git a/nmdb/main.c b/nmdb/main.c
index 49e3816..4c91536 100644
--- a/nmdb/main.c
+++ b/nmdb/main.c
@@ -30,6 +30,8 @@ static void help(void) {
 	  "  -L upper	upper TIPC port number (= lower)\n"
 	  "  -a addr	TCP listening address (all local addresses)\n"
 	  "  -P port	TCP listening port (26010)\n"
+	  "  -U addr	UDP listening address (all local addresses)\n"
+	  "  -u port	UDP listening port (26010)\n"
 	  "  -c nobj	max. number of objects to be cached, in thousands (128)\n"
 	  "  -f		don't fork and stay in the foreground\n"
 	  "  -p		enable passive mode, for redundancy purposes (read docs.)\n"
@@ -49,6 +51,8 @@ static int load_settings(int argc, char **argv)
 	settings.tipc_upper = -1;
 	settings.tcp_addr = NULL;
 	settings.tcp_port = -1;
+	settings.udp_addr = NULL;
+	settings.udp_port = -1;
 	settings.numobjs = -1;
 	settings.foreground = 0;
 	settings.passive = 0;
@@ -75,6 +79,13 @@ static int load_settings(int argc, char **argv)
 		case 'P':
 			settings.tcp_port = atoi(optarg);
 			break;
+		case 'U':
+			settings.udp_addr = optarg;
+			break;
+		case 'u':
+			settings.udp_port = atoi(optarg);
+			break;
+
 		case 'c':
 			settings.numobjs = atoi(optarg) * 1024;
 			break;
@@ -102,6 +113,10 @@ static int load_settings(int argc, char **argv)
 		settings.tcp_addr = TCP_SERVER_ADDR;
 	if (settings.tcp_port == -1)
 		settings.tcp_port = TCP_SERVER_PORT;
+	if (settings.udp_addr == NULL)
+		settings.udp_addr = UDP_SERVER_ADDR;
+	if (settings.udp_port == -1)
+		settings.udp_port = UDP_SERVER_PORT;
 	if (settings.numobjs == -1)
 		settings.numobjs = 128 * 1024;
 
diff --git a/nmdb/net-const.h b/nmdb/net-const.h
index 16871e7..bdaa29d 100644
--- a/nmdb/net-const.h
+++ b/nmdb/net-const.h
@@ -15,6 +15,10 @@
 #define TCP_SERVER_ADDR "0.0.0.0"
 #define TCP_SERVER_PORT 26010
 
+/* UDP default listen address and port. */
+#define UDP_SERVER_ADDR "0.0.0.0"
+#define UDP_SERVER_PORT 26010
+
 /* Protocol version, for checking in the network header. */
 #define PROTO_VER 1
 
diff --git a/nmdb/net.c b/nmdb/net.c
index 0e61c5e..a376c17 100644
--- a/nmdb/net.c
+++ b/nmdb/net.c
@@ -12,6 +12,7 @@ typedef unsigned char u_char;
 #include "common.h"
 #include "tipc.h"
 #include "tcp.h"
+#include "udp.h"
 #include "net.h"
 
 
@@ -31,12 +32,14 @@ void net_loop(void)
 {
 	int tipc_fd = -1;
 	int tcp_fd = -1;
-	struct event tipc_evt, tcp_evt, sigterm_evt, sigint_evt, sigusr2_evt;
+	int udp_fd = -1;
+	struct event tipc_evt, tcp_evt, udp_evt,
+		     sigterm_evt, sigint_evt, sigusr2_evt;
 
 	event_init();
 
-	/* ENABLE_TIPC and ENABLE_TCP are preprocessor constants defined on
-	 * the command line by make. */
+	/* ENABLE_* are preprocessor constants defined on the command line by
+	 * make. */
 
 	if (ENABLE_TIPC) {
 		tipc_fd = tipc_init();
@@ -62,6 +65,18 @@ void net_loop(void)
 		event_add(&tcp_evt, NULL);
 	}
 
+	if (ENABLE_UDP) {
+		udp_fd = udp_init();
+		if (udp_fd < 0) {
+			perror("Error initializing UDP");
+			exit(1);
+		}
+
+		event_set(&udp_evt, udp_fd, EV_READ | EV_PERSIST, udp_recv,
+				&udp_evt);
+		event_add(&udp_evt, NULL);
+	}
+
 	signal_set(&sigterm_evt, SIGTERM, exit_sighandler, &sigterm_evt);
 	signal_add(&sigterm_evt, NULL);
 	signal_set(&sigint_evt, SIGINT, exit_sighandler, &sigint_evt);
@@ -80,6 +95,7 @@ void net_loop(void)
 
 	tipc_close(tipc_fd);
 	tcp_close(tcp_fd);
+	udp_close(udp_fd);
 }
 
 
diff --git a/nmdb/req.h b/nmdb/req.h
index 4c52d59..454f0f8 100644
--- a/nmdb/req.h
+++ b/nmdb/req.h
@@ -10,6 +10,7 @@
 /* req_info types, according to the protocol */
 #define REQTYPE_TIPC 1
 #define REQTYPE_TCP 2
+#define REQTYPE_UDP 3
 
 
 struct req_info {
diff --git a/nmdb/udp-stub.c b/nmdb/udp-stub.c
new file mode 100644
index 0000000..1fbc770
--- /dev/null
+++ b/nmdb/udp-stub.c
@@ -0,0 +1,18 @@
+
+/* UDP stub file, used when UDP is not compiled in. */
+
+int udp_init(void)
+{
+	return -1;
+}
+
+void udp_close(int fd)
+{
+	return;
+}
+
+void udp_recv(int fd, short event, void *arg)
+{
+	return;
+}
+
diff --git a/nmdb/udp.c b/nmdb/udp.c
new file mode 100644
index 0000000..40d9747
--- /dev/null
+++ b/nmdb/udp.c
@@ -0,0 +1,248 @@
+
+#include <sys/types.h>		/* socket defines */
+#include <sys/socket.h>		/* socket functions */
+#include <stdlib.h>		/* malloc() */
+#include <stdio.h>		/* perror() */
+#include <stdint.h>		/* uint32_t and friends */
+#include <arpa/inet.h>		/* htonls() and friends */
+#include <netinet/in.h>		/* INET stuff */
+#include <netinet/udp.h>	/* UDP stuff */
+#include <string.h>		/* memcpy() */
+#include <unistd.h>		/* close() */
+
+#include "udp.h"
+#include "common.h"
+#include "net-const.h"
+#include "req.h"
+#include "parse.h"
+
+
+static void udp_mini_reply(struct req_info *req, uint32_t reply);
+static void udp_reply_err(struct req_info *req, uint32_t reply);
+static void udp_reply_get(struct req_info *req, uint32_t reply,
+		unsigned char *val, size_t vsize);
+static void udp_reply_set(struct req_info *req, uint32_t reply);
+static void udp_reply_del(struct req_info *req, uint32_t reply);
+static void udp_reply_cas(struct req_info *req, uint32_t reply);
+
+
+/*
+ * Miscelaneous helper functions
+ */
+
+static void rep_send_error(const struct req_info *req, const unsigned int code)
+{
+	int r, c;
+	unsigned char minibuf[3 * 4];
+
+	if (settings.passive)
+		return;
+
+	/* Network format: ID (4), REP_ERR (4), error code (4) */
+	r = htonl(REP_ERR);
+	c = htonl(code);
+	memcpy(minibuf, &(req->id), 4);
+	memcpy(minibuf + 4, &r, 4);
+	memcpy(minibuf + 8, &c, 4);
+
+	/* If this send fails, there's nothing to be done */
+	r = sendto(req->fd, minibuf, 3 * 4, 0, req->clisa, req->clilen);
+
+	if (r < 0) {
+		perror("rep_send_error() failed");
+	}
+}
+
+
+static int rep_send(const struct req_info *req, const unsigned char *buf,
+		const size_t size)
+{
+	int rv;
+
+	if (settings.passive)
+		return 1;
+
+	rv = sendto(req->fd, buf, size, 0, req->clisa, req->clilen);
+	if (rv < 0) {
+		rep_send_error(req, ERR_SEND);
+		return 0;
+	}
+	return 1;
+}
+
+
+/* Send small replies, consisting in only a value. */
+static void udp_mini_reply(struct req_info *req, uint32_t reply)
+{
+	/* We use a mini buffer to speedup the small replies, to avoid the
+	 * malloc() overhead. */
+	unsigned char minibuf[8];
+
+	if (settings.passive)
+		return;
+
+	reply = htonl(reply);
+	memcpy(minibuf, &(req->id), 4);
+	memcpy(minibuf + 4, &reply, 4);
+	rep_send(req, minibuf, 8);
+	return;
+}
+
+
+/* The udp_reply_* functions are used by the db code to send the network
+ * replies. */
+
+void udp_reply_err(struct req_info *req, uint32_t reply)
+{
+	rep_send_error(req, reply);
+}
+
+void udp_reply_get(struct req_info *req, uint32_t reply,
+			unsigned char *val, size_t vsize)
+{
+	if (val == NULL) {
+		/* miss */
+		udp_mini_reply(req, reply);
+	} else {
+		unsigned char *buf;
+		size_t bsize;
+		uint32_t t;
+
+		reply = htonl(reply);
+
+		/* The reply length is:
+		 * 4		id
+		 * 4		reply code
+		 * 4		vsize
+		 * vsize	val
+		 */
+		bsize = 4 + 4 + 4 + vsize;
+		buf = malloc(bsize);
+
+		t = htonl(vsize);
+
+		memcpy(buf, &(req->id), 4);
+		memcpy(buf + 4, &reply, 4);
+		memcpy(buf + 8, &t, 4);
+		memcpy(buf + 12, val, vsize);
+
+		rep_send(req, buf, bsize);
+		free(buf);
+	}
+	return;
+
+}
+
+
+void udp_reply_set(struct req_info *req, uint32_t reply)
+{
+	udp_mini_reply(req, reply);
+}
+
+
+void udp_reply_del(struct req_info *req, uint32_t reply)
+{
+	udp_mini_reply(req, reply);
+}
+
+void udp_reply_cas(struct req_info *req, uint32_t reply)
+{
+	udp_mini_reply(req, reply);
+}
+
+
+/*
+ * Main functions for receiving and parsing
+ */
+
+int udp_init(void)
+{
+	int fd, rv;
+	struct sockaddr_in srvsa;
+	struct in_addr ia;
+
+	rv = inet_pton(AF_INET, settings.udp_addr, &ia);
+	if (rv <= 0)
+		return -1;
+
+	srvsa.sin_family = AF_INET;
+	srvsa.sin_addr.s_addr = ia.s_addr;
+	srvsa.sin_port = htons(settings.udp_port);
+
+	fd = socket(AF_INET, SOCK_DGRAM, 0);
+	if (fd < 0)
+		return -1;
+
+	rv = 1;
+	if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &rv, sizeof(rv)) < 0 ) {
+		close(fd);
+		return -1;
+	}
+
+	rv = bind(fd, (struct sockaddr *) &srvsa, sizeof(srvsa));
+	if (rv < 0)
+		return -1;
+
+	return fd;
+}
+
+
+void udp_close(int fd)
+{
+	close(fd);
+}
+
+
+/* Called by libevent for each receive event */
+void udp_recv(int fd, short event, void *arg)
+{
+	int rv;
+	struct req_info req;
+	struct sockaddr_in clisa;
+	socklen_t clilen;
+	size_t bsize;
+
+	/* Allocate enough to hold the max msg length of 66000 bytes.
+	 * Originally, this was malloc()ed, but using the stack made it go
+	 * from 27 usec for each set operation, to 23 usec. While it may sound
+	 * worthless, it made test1 go from 3.213s to 2.345s for 37618
+	 * operations.
+	 * TODO: check for negative impacts (beside being ugly, obviously)
+	 */
+	unsigned char buf[68 * 1024];
+	bsize = 68 * 1024;
+
+	clilen = sizeof(clisa);
+
+	rv = recvfrom(fd, buf, bsize, 0, (struct sockaddr *) &clisa,
+			&clilen);
+	if (rv <= 0) {
+		/* rv == 0 means "return of an undeliverable message", which
+		 * we ignore; -1 means other error. */
+		goto exit;
+	}
+
+	if (rv < 2) {
+		stats.net_broken_req++;
+		goto exit;
+	}
+
+	req.fd = fd;
+	req.type = REQTYPE_UDP;
+	req.clisa = (struct sockaddr *) &clisa;
+	req.clilen = clilen;
+	req.mini_reply = udp_mini_reply;
+	req.reply_err = udp_reply_err;
+	req.reply_get = udp_reply_get;
+	req.reply_set = udp_reply_set;
+	req.reply_del = udp_reply_del;
+	req.reply_cas = udp_reply_cas;
+
+	/* parse the message */
+	parse_message(&req, buf, rv);
+
+exit:
+	return;
+}
+
+
diff --git a/nmdb/udp.h b/nmdb/udp.h
new file mode 100644
index 0000000..ea473cf
--- /dev/null
+++ b/nmdb/udp.h
@@ -0,0 +1,10 @@
+
+#ifndef _UDP_H
+#define _UDP_H
+
+int udp_init(void);
+void udp_close(int fd);
+void udp_recv(int fd, short event, void *arg);
+
+#endif
+