dovecot-2.1: redis dict: Added support for set/unset/atomic_inc.

dovecot at dovecot.org dovecot at dovecot.org
Mon Jul 23 17:24:26 EEST 2012


details:   http://hg.dovecot.org/dovecot-2.1/rev/54e2556f87ea
changeset: 14629:54e2556f87ea
user:      Timo Sirainen <tss at iki.fi>
date:      Mon Jul 23 17:24:13 2012 +0300
description:
redis dict: Added support for set/unset/atomic_inc.
This allows using Redis as dict quota backend.

diffstat:

 src/lib-dict/dict-redis.c |  504 +++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 451 insertions(+), 53 deletions(-)

diffs (truncated from 632 to 300 lines):

diff -r 522e03dd4268 -r 54e2556f87ea src/lib-dict/dict-redis.c
--- a/src/lib-dict/dict-redis.c	Mon Jul 23 17:23:37 2012 +0300
+++ b/src/lib-dict/dict-redis.c	Mon Jul 23 17:24:13 2012 +0300
@@ -1,6 +1,7 @@
 /* Copyright (c) 2008-2012 Dovecot authors, see the included COPYING redis */
 
 #include "lib.h"
+#include "array.h"
 #include "str.h"
 #include "istream.h"
 #include "ostream.h"
@@ -9,6 +10,20 @@
 
 #define REDIS_DEFAULT_PORT 6379
 #define REDIS_DEFAULT_LOOKUP_TIMEOUT_MSECS (1000*30)
+#define DICT_USERNAME_SEPARATOR '/'
+
+enum redis_input_state {
+	/* expecting $-1 / $<size> followed by GET reply */
+	REDIS_INPUT_STATE_GET,
+	/* expecting +QUEUED */
+	REDIS_INPUT_STATE_MULTI,
+	/* expecting +OK reply for DISCARD */
+	REDIS_INPUT_STATE_DISCARD,
+	/* expecting *<nreplies> */
+	REDIS_INPUT_STATE_EXEC,
+	/* expecting EXEC reply */
+	REDIS_INPUT_STATE_EXEC_REPLY
+};
 
 struct redis_connection {
 	struct connection conn;
@@ -20,35 +35,208 @@
 	bool value_received;
 };
 
+struct redis_dict_reply {
+	unsigned int reply_count;
+	dict_transaction_commit_callback_t *callback;
+	void *context;
+};
+
 struct redis_dict {
 	struct dict dict;
 	struct ip_addr ip;
+	char *username, *key_prefix;
 	unsigned int port;
 	unsigned int timeout_msecs;
 
 	struct ioloop *ioloop;
 	struct redis_connection conn;
+
+	ARRAY_DEFINE(input_states, enum redis_input_state);
+	ARRAY_DEFINE(replies, struct redis_dict_reply);
+
 	bool connected;
+	bool transaction_open;
+};
+
+struct redis_dict_transaction_context {
+	struct dict_transaction_context ctx;
+	unsigned int cmd_count;
+	bool failed;
 };
 
 static struct connection_list *redis_connections;
 
+static void
+redis_input_state_add(struct redis_dict *dict, enum redis_input_state state)
+{
+	array_append(&dict->input_states, &state, 1);
+}
+
+static void redis_input_state_remove(struct redis_dict *dict)
+{
+	array_delete(&dict->input_states, 0, 1);
+}
+
 static void redis_conn_destroy(struct connection *_conn)
 {
 	struct redis_connection *conn = (struct redis_connection *)_conn;
+	const struct redis_dict_reply *reply;
 
 	conn->dict->connected = FALSE;
 	connection_disconnect(_conn);
+
+	array_foreach(&conn->dict->replies, reply) {
+		if (reply->callback != NULL)
+			reply->callback(-1, reply->context);
+	}
+	array_clear(&conn->dict->replies);
+	array_clear(&conn->dict->input_states);
+
 	if (conn->dict->ioloop != NULL)
 		io_loop_stop(conn->dict->ioloop);
 }
 
+static void redis_wait(struct redis_dict *dict)
+{
+	struct ioloop *prev_ioloop = current_ioloop;
+
+	i_assert(dict->ioloop == NULL);
+
+	dict->ioloop = io_loop_create();
+	connection_switch_ioloop(&dict->conn.conn);
+
+	do {
+		io_loop_run(dict->ioloop);
+	} while (array_count(&dict->input_states) > 0);
+
+	current_ioloop = prev_ioloop;
+	connection_switch_ioloop(&dict->conn.conn);
+	current_ioloop = dict->ioloop;
+	io_loop_destroy(&dict->ioloop);
+}
+
+static void redis_input_get(struct redis_connection *conn)
+{
+	const unsigned char *data;
+	size_t size;
+	const char *line;
+
+	if (conn->bytes_left == 0) {
+		/* read the size first */
+		line = i_stream_next_line(conn->conn.input);
+		if (line == NULL)
+			return;
+		if (strcmp(line, "$-1") == 0) {
+			conn->value_received = TRUE;
+			conn->value_not_found = TRUE;
+			if (conn->dict->ioloop != NULL)
+				io_loop_stop(conn->dict->ioloop);
+			redis_input_state_remove(conn->dict);
+			return;
+		}
+		if (line[0] != '$' || str_to_uint(line+1, &conn->bytes_left) < 0) {
+			i_error("redis: Unexpected input (wanted $size): %s",
+				line);
+			redis_conn_destroy(&conn->conn);
+			return;
+		}
+		conn->bytes_left += 2; /* include trailing CRLF */
+	}
+
+	data = i_stream_get_data(conn->conn.input, &size);
+	if (size > conn->bytes_left)
+		size = conn->bytes_left;
+	str_append_n(conn->last_reply, data, size);
+
+	conn->bytes_left -= size;
+	i_stream_skip(conn->conn.input, size);
+
+	if (conn->bytes_left == 0) {
+		/* reply fully read - drop trailing CRLF */
+		conn->value_received = TRUE;
+		str_truncate(conn->last_reply, str_len(conn->last_reply)-2);
+
+		if (conn->dict->ioloop != NULL)
+			io_loop_stop(conn->dict->ioloop);
+		redis_input_state_remove(conn->dict);
+	}
+}
+
+static int redis_conn_input_more(struct redis_connection *conn)
+{
+	struct redis_dict *dict = conn->dict;
+	struct redis_dict_reply *reply;
+	const enum redis_input_state *states;
+	enum redis_input_state state;
+	unsigned int count, num_replies;
+	const char *line;
+
+	states = array_get(&dict->input_states, &count);
+	if (count == 0) {
+		line = i_stream_next_line(conn->conn.input);
+		if (line == NULL) line = "";
+		i_error("redis: Unexpected input (expected nothing): %s",
+			line);
+		return -1;
+	}
+	state = states[0];
+	if (state == REDIS_INPUT_STATE_GET) {
+		redis_input_get(conn);
+		return 1;
+	}
+
+	line = i_stream_next_line(conn->conn.input);
+	if (line == NULL)
+		return 0;
+
+	redis_input_state_remove(dict);
+	switch (state) {
+	case REDIS_INPUT_STATE_GET:
+		i_unreached();
+	case REDIS_INPUT_STATE_MULTI:
+	case REDIS_INPUT_STATE_DISCARD:
+		if (line[0] != '+')
+			break;
+		return 1;
+	case REDIS_INPUT_STATE_EXEC:
+		if (line[0] != '*' || str_to_uint(line+1, &num_replies) < 0)
+			break;
+
+		reply = array_idx_modifiable(&dict->replies, 0);
+		i_assert(reply->reply_count > 0);
+		if (reply->reply_count != num_replies) {
+			i_error("redis: EXEC expected %u replies, not %u",
+				reply->reply_count, num_replies);
+			return -1;
+		}
+		return 1;
+	case REDIS_INPUT_STATE_EXEC_REPLY:
+		if (*line != '+' && *line != ':')
+			break;
+		/* success, just ignore the actual reply */
+		reply = array_idx_modifiable(&dict->replies, 0);
+		i_assert(reply->reply_count > 0);
+		if (--reply->reply_count == 0) {
+			if (reply->callback != NULL)
+				reply->callback(1, reply->context);
+			array_delete(&dict->replies, 0, 1);
+			/* if we're running in a dict-ioloop, we're handling a
+			   synchronous commit and need to stop now */
+			if (array_count(&dict->replies) == 0 &&
+			    conn->dict->ioloop != NULL)
+				io_loop_stop(conn->dict->ioloop);
+		}
+		return 1;
+	}
+	i_error("redis: Unexpected input (state=%d): %s", state, line);
+	return -1;
+}
+
 static void redis_conn_input(struct connection *_conn)
 {
 	struct redis_connection *conn = (struct redis_connection *)_conn;
-	const unsigned char *data;
 	size_t size;
-	const char *line;
+	int ret;
 
 	switch (i_stream_read(_conn->input)) {
 	case 0:
@@ -60,42 +248,13 @@
 		break;
 	}
 
-	if (conn->bytes_left == 0) {
-		/* read the size first */
-		line = i_stream_next_line(_conn->input);
-		if (line == NULL)
-			return;
-		if (strcmp(line, "$-1") == 0) {
-			conn->value_received = TRUE;
-			conn->value_not_found = TRUE;
-			if (conn->dict->ioloop != NULL)
-				io_loop_stop(conn->dict->ioloop);
-			return;
-		}
-		if (line[0] != '$' || str_to_uint(line+1, &conn->bytes_left) < 0) {
-			i_error("redis: Unexpected input (wanted $size): %s",
-				line);
-			redis_conn_destroy(_conn);
-			return;
-		}
-		conn->bytes_left += 2; /* include trailing CRLF */
+	while ((ret = redis_conn_input_more(conn)) > 0) {
+		i_stream_get_data(_conn->input, &size);
+		if (size == 0)
+			break;
 	}
-
-	data = i_stream_get_data(_conn->input, &size);
-	if (size > conn->bytes_left)
-		size = conn->bytes_left;
-	str_append_n(conn->last_reply, data, size);
-
-	conn->bytes_left -= size;
-	i_stream_skip(_conn->input, size);
-
-	if (conn->bytes_left == 0) {
-		/* drop trailing CRLF */
-		conn->value_received = TRUE;
-		str_truncate(conn->last_reply, str_len(conn->last_reply)-2);
-		if (conn->dict->ioloop != NULL)
-			io_loop_stop(conn->dict->ioloop);
-	}
+	if (ret < 0)
+		redis_conn_destroy(_conn);
 }
 
 static void redis_conn_connected(struct connection *_conn)
@@ -124,10 +283,30 @@
 	.connected = redis_conn_connected
 };
 
+static const char *redis_escape_username(const char *username)
+{
+	const char *p;
+	string_t *str = t_str_new(64);


More information about the dovecot-cvs mailing list