author | Alberto Bertogli
<albertito@gmail.com> 2007-06-07 05:15:08 UTC |
committer | Alberto Bertogli
<albertito@gmail.com> 2007-06-07 05:15:08 UTC |
parent | 2dc32f844d36921b0d9bd576e82e98cc07887e5e |
nmdb/Makefile | +10 | -1 |
nmdb/common.h | +2 | -0 |
nmdb/main.c | +15 | -0 |
nmdb/net-const.h | +4 | -0 |
nmdb/net.c | +19 | -3 |
nmdb/req.h | +1 | -0 |
nmdb/udp-stub.c | +18 | -0 |
nmdb/udp.c | +248 | -0 |
nmdb/udp.h | +10 | -0 |
diff --git a/nmdb/Makefile b/nmdb/Makefile index c58d289..5c58c5a 100644 --- a/nmdb/Makefile +++ b/nmdb/Makefile @@ -1,10 +1,13 @@ ENABLE_TIPC = 1 ENABLE_TCP = 1 +ENABLE_UDP = 1 CFLAGS += -std=c99 -Wall -O3 ALL_CFLAGS = -D_XOPEN_SOURCE=500 $(CFLAGS) -ALL_CFLAGS += -DENABLE_TIPC=$(ENABLE_TIPC) -DENABLE_TCP=$(ENABLE_TCP) +ALL_CFLAGS += -DENABLE_TIPC=$(ENABLE_TIPC) \ + -DENABLE_TCP=$(ENABLE_TCP) \ + -DENABLE_UDP=$(ENABLE_UDP) ifdef DEBUG ALL_CFLAGS += -g @@ -36,6 +39,12 @@ else OBJS += tcp-stub.o endif +ifeq ($(ENABLE_UDP), 1) + OBJS += udp.o +else + OBJS += udp-stub.o +endif + default: all diff --git a/nmdb/common.h b/nmdb/common.h index 31d1548..2d6cf56 100644 --- a/nmdb/common.h +++ b/nmdb/common.h @@ -18,6 +18,8 @@ struct settings { int tipc_upper; char *tcp_addr; int tcp_port; + char *udp_addr; + int udp_port; int numobjs; int foreground; int passive; diff --git a/nmdb/main.c b/nmdb/main.c index 49e3816..4c91536 100644 --- a/nmdb/main.c +++ b/nmdb/main.c @@ -30,6 +30,8 @@ static void help(void) { " -L upper upper TIPC port number (= lower)\n" " -a addr TCP listening address (all local addresses)\n" " -P port TCP listening port (26010)\n" + " -U addr UDP listening address (all local addresses)\n" + " -u port UDP listening port (26010)\n" " -c nobj max. number of objects to be cached, in thousands (128)\n" " -f don't fork and stay in the foreground\n" " -p enable passive mode, for redundancy purposes (read docs.)\n" @@ -49,6 +51,8 @@ static int load_settings(int argc, char **argv) settings.tipc_upper = -1; settings.tcp_addr = NULL; settings.tcp_port = -1; + settings.udp_addr = NULL; + settings.udp_port = -1; settings.numobjs = -1; settings.foreground = 0; settings.passive = 0; @@ -75,6 +79,13 @@ static int load_settings(int argc, char **argv) case 'P': settings.tcp_port = atoi(optarg); break; + case 'U': + settings.udp_addr = optarg; + break; + case 'u': + settings.udp_port = atoi(optarg); + break; + case 'c': settings.numobjs = atoi(optarg) * 1024; break; @@ -102,6 +113,10 @@ static int load_settings(int argc, char **argv) settings.tcp_addr = TCP_SERVER_ADDR; if (settings.tcp_port == -1) settings.tcp_port = TCP_SERVER_PORT; + if (settings.udp_addr == NULL) + settings.udp_addr = UDP_SERVER_ADDR; + if (settings.udp_port == -1) + settings.udp_port = UDP_SERVER_PORT; if (settings.numobjs == -1) settings.numobjs = 128 * 1024; diff --git a/nmdb/net-const.h b/nmdb/net-const.h index 16871e7..bdaa29d 100644 --- a/nmdb/net-const.h +++ b/nmdb/net-const.h @@ -15,6 +15,10 @@ #define TCP_SERVER_ADDR "0.0.0.0" #define TCP_SERVER_PORT 26010 +/* UDP default listen address and port. */ +#define UDP_SERVER_ADDR "0.0.0.0" +#define UDP_SERVER_PORT 26010 + /* Protocol version, for checking in the network header. */ #define PROTO_VER 1 diff --git a/nmdb/net.c b/nmdb/net.c index 0e61c5e..a376c17 100644 --- a/nmdb/net.c +++ b/nmdb/net.c @@ -12,6 +12,7 @@ typedef unsigned char u_char; #include "common.h" #include "tipc.h" #include "tcp.h" +#include "udp.h" #include "net.h" @@ -31,12 +32,14 @@ void net_loop(void) { int tipc_fd = -1; int tcp_fd = -1; - struct event tipc_evt, tcp_evt, sigterm_evt, sigint_evt, sigusr2_evt; + int udp_fd = -1; + struct event tipc_evt, tcp_evt, udp_evt, + sigterm_evt, sigint_evt, sigusr2_evt; event_init(); - /* ENABLE_TIPC and ENABLE_TCP are preprocessor constants defined on - * the command line by make. */ + /* ENABLE_* are preprocessor constants defined on the command line by + * make. */ if (ENABLE_TIPC) { tipc_fd = tipc_init(); @@ -62,6 +65,18 @@ void net_loop(void) event_add(&tcp_evt, NULL); } + if (ENABLE_UDP) { + udp_fd = udp_init(); + if (udp_fd < 0) { + perror("Error initializing UDP"); + exit(1); + } + + event_set(&udp_evt, udp_fd, EV_READ | EV_PERSIST, udp_recv, + &udp_evt); + event_add(&udp_evt, NULL); + } + signal_set(&sigterm_evt, SIGTERM, exit_sighandler, &sigterm_evt); signal_add(&sigterm_evt, NULL); signal_set(&sigint_evt, SIGINT, exit_sighandler, &sigint_evt); @@ -80,6 +95,7 @@ void net_loop(void) tipc_close(tipc_fd); tcp_close(tcp_fd); + udp_close(udp_fd); } diff --git a/nmdb/req.h b/nmdb/req.h index 4c52d59..454f0f8 100644 --- a/nmdb/req.h +++ b/nmdb/req.h @@ -10,6 +10,7 @@ /* req_info types, according to the protocol */ #define REQTYPE_TIPC 1 #define REQTYPE_TCP 2 +#define REQTYPE_UDP 3 struct req_info { diff --git a/nmdb/udp-stub.c b/nmdb/udp-stub.c new file mode 100644 index 0000000..1fbc770 --- /dev/null +++ b/nmdb/udp-stub.c @@ -0,0 +1,18 @@ + +/* UDP stub file, used when UDP is not compiled in. */ + +int udp_init(void) +{ + return -1; +} + +void udp_close(int fd) +{ + return; +} + +void udp_recv(int fd, short event, void *arg) +{ + return; +} + diff --git a/nmdb/udp.c b/nmdb/udp.c new file mode 100644 index 0000000..40d9747 --- /dev/null +++ b/nmdb/udp.c @@ -0,0 +1,248 @@ + +#include <sys/types.h> /* socket defines */ +#include <sys/socket.h> /* socket functions */ +#include <stdlib.h> /* malloc() */ +#include <stdio.h> /* perror() */ +#include <stdint.h> /* uint32_t and friends */ +#include <arpa/inet.h> /* htonls() and friends */ +#include <netinet/in.h> /* INET stuff */ +#include <netinet/udp.h> /* UDP stuff */ +#include <string.h> /* memcpy() */ +#include <unistd.h> /* close() */ + +#include "udp.h" +#include "common.h" +#include "net-const.h" +#include "req.h" +#include "parse.h" + + +static void udp_mini_reply(struct req_info *req, uint32_t reply); +static void udp_reply_err(struct req_info *req, uint32_t reply); +static void udp_reply_get(struct req_info *req, uint32_t reply, + unsigned char *val, size_t vsize); +static void udp_reply_set(struct req_info *req, uint32_t reply); +static void udp_reply_del(struct req_info *req, uint32_t reply); +static void udp_reply_cas(struct req_info *req, uint32_t reply); + + +/* + * Miscelaneous helper functions + */ + +static void rep_send_error(const struct req_info *req, const unsigned int code) +{ + int r, c; + unsigned char minibuf[3 * 4]; + + if (settings.passive) + return; + + /* Network format: ID (4), REP_ERR (4), error code (4) */ + r = htonl(REP_ERR); + c = htonl(code); + memcpy(minibuf, &(req->id), 4); + memcpy(minibuf + 4, &r, 4); + memcpy(minibuf + 8, &c, 4); + + /* If this send fails, there's nothing to be done */ + r = sendto(req->fd, minibuf, 3 * 4, 0, req->clisa, req->clilen); + + if (r < 0) { + perror("rep_send_error() failed"); + } +} + + +static int rep_send(const struct req_info *req, const unsigned char *buf, + const size_t size) +{ + int rv; + + if (settings.passive) + return 1; + + rv = sendto(req->fd, buf, size, 0, req->clisa, req->clilen); + if (rv < 0) { + rep_send_error(req, ERR_SEND); + return 0; + } + return 1; +} + + +/* Send small replies, consisting in only a value. */ +static void udp_mini_reply(struct req_info *req, uint32_t reply) +{ + /* We use a mini buffer to speedup the small replies, to avoid the + * malloc() overhead. */ + unsigned char minibuf[8]; + + if (settings.passive) + return; + + reply = htonl(reply); + memcpy(minibuf, &(req->id), 4); + memcpy(minibuf + 4, &reply, 4); + rep_send(req, minibuf, 8); + return; +} + + +/* The udp_reply_* functions are used by the db code to send the network + * replies. */ + +void udp_reply_err(struct req_info *req, uint32_t reply) +{ + rep_send_error(req, reply); +} + +void udp_reply_get(struct req_info *req, uint32_t reply, + unsigned char *val, size_t vsize) +{ + if (val == NULL) { + /* miss */ + udp_mini_reply(req, reply); + } else { + unsigned char *buf; + size_t bsize; + uint32_t t; + + reply = htonl(reply); + + /* The reply length is: + * 4 id + * 4 reply code + * 4 vsize + * vsize val + */ + bsize = 4 + 4 + 4 + vsize; + buf = malloc(bsize); + + t = htonl(vsize); + + memcpy(buf, &(req->id), 4); + memcpy(buf + 4, &reply, 4); + memcpy(buf + 8, &t, 4); + memcpy(buf + 12, val, vsize); + + rep_send(req, buf, bsize); + free(buf); + } + return; + +} + + +void udp_reply_set(struct req_info *req, uint32_t reply) +{ + udp_mini_reply(req, reply); +} + + +void udp_reply_del(struct req_info *req, uint32_t reply) +{ + udp_mini_reply(req, reply); +} + +void udp_reply_cas(struct req_info *req, uint32_t reply) +{ + udp_mini_reply(req, reply); +} + + +/* + * Main functions for receiving and parsing + */ + +int udp_init(void) +{ + int fd, rv; + struct sockaddr_in srvsa; + struct in_addr ia; + + rv = inet_pton(AF_INET, settings.udp_addr, &ia); + if (rv <= 0) + return -1; + + srvsa.sin_family = AF_INET; + srvsa.sin_addr.s_addr = ia.s_addr; + srvsa.sin_port = htons(settings.udp_port); + + fd = socket(AF_INET, SOCK_DGRAM, 0); + if (fd < 0) + return -1; + + rv = 1; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &rv, sizeof(rv)) < 0 ) { + close(fd); + return -1; + } + + rv = bind(fd, (struct sockaddr *) &srvsa, sizeof(srvsa)); + if (rv < 0) + return -1; + + return fd; +} + + +void udp_close(int fd) +{ + close(fd); +} + + +/* Called by libevent for each receive event */ +void udp_recv(int fd, short event, void *arg) +{ + int rv; + struct req_info req; + struct sockaddr_in clisa; + socklen_t clilen; + size_t bsize; + + /* Allocate enough to hold the max msg length of 66000 bytes. + * Originally, this was malloc()ed, but using the stack made it go + * from 27 usec for each set operation, to 23 usec. While it may sound + * worthless, it made test1 go from 3.213s to 2.345s for 37618 + * operations. + * TODO: check for negative impacts (beside being ugly, obviously) + */ + unsigned char buf[68 * 1024]; + bsize = 68 * 1024; + + clilen = sizeof(clisa); + + rv = recvfrom(fd, buf, bsize, 0, (struct sockaddr *) &clisa, + &clilen); + if (rv <= 0) { + /* rv == 0 means "return of an undeliverable message", which + * we ignore; -1 means other error. */ + goto exit; + } + + if (rv < 2) { + stats.net_broken_req++; + goto exit; + } + + req.fd = fd; + req.type = REQTYPE_UDP; + req.clisa = (struct sockaddr *) &clisa; + req.clilen = clilen; + req.mini_reply = udp_mini_reply; + req.reply_err = udp_reply_err; + req.reply_get = udp_reply_get; + req.reply_set = udp_reply_set; + req.reply_del = udp_reply_del; + req.reply_cas = udp_reply_cas; + + /* parse the message */ + parse_message(&req, buf, rv); + +exit: + return; +} + + diff --git a/nmdb/udp.h b/nmdb/udp.h new file mode 100644 index 0000000..ea473cf --- /dev/null +++ b/nmdb/udp.h @@ -0,0 +1,10 @@ + +#ifndef _UDP_H +#define _UDP_H + +int udp_init(void); +void udp_close(int fd); +void udp_recv(int fd, short event, void *arg); + +#endif +