dovecot-2.0: dict: Added support for async commits. Changed dict...
dovecot at dovecot.org
dovecot at dovecot.org
Mon Sep 7 03:55:59 EEST 2009
details: http://hg.dovecot.org/dovecot-2.0/rev/0f7b25f3e2ce
changeset: 9889:0f7b25f3e2ce
user: Timo Sirainen <tss at iki.fi>
date: Sun Sep 06 20:51:25 2009 -0400
description:
dict: Added support for async commits. Changed dict_atomic_inc() behavior.
diffstat:
10 files changed, 353 insertions(+), 105 deletions(-)
src/dict/dict-commands.c | 73 +++++++++++---
src/dict/dict-connection.h | 1
src/lib-dict/dict-client.c | 211 +++++++++++++++++++++++++++++++++++--------
src/lib-dict/dict-client.h | 4
src/lib-dict/dict-db.c | 15 ++-
src/lib-dict/dict-file.c | 29 ++++-
src/lib-dict/dict-private.h | 5 -
src/lib-dict/dict-sql.c | 87 +++++++++++------
src/lib-dict/dict.c | 13 ++
src/lib-dict/dict.h | 20 +++-
diffs (truncated from 860 to 300 lines):
diff -r 551c273f4844 -r 0f7b25f3e2ce src/dict/dict-commands.c
--- a/src/dict/dict-commands.c Sun Sep 06 20:42:42 2009 -0400
+++ b/src/dict/dict-commands.c Sun Sep 06 20:51:25 2009 -0400
@@ -54,7 +54,8 @@ static int cmd_iterate_flush(struct dict
o_stream_cork(conn->output);
while ((ret = dict_iterate(conn->iter_ctx, &key, &value)) > 0) {
str_truncate(str, 0);
- str_printfa(str, "%s\t%s\n", key, value);
+ str_printfa(str, "%c%s\t%s\n", DICT_PROTOCOL_REPLY_OK,
+ key, value);
o_stream_send(conn->output, str_data(str), str_len(str));
if (o_stream_get_buffer_used_size(conn->output) >
@@ -154,6 +155,7 @@ static int cmd_begin(struct dict_connect
/* <id> */
trans = array_append_space(&conn->transactions);
trans->id = id;
+ trans->conn = conn;
trans->ctx = dict_transaction_begin(conn->dict);
return 0;
}
@@ -182,23 +184,56 @@ static int cmd_commit(struct dict_connec
static int cmd_commit(struct dict_connection *conn, const char *line)
{
struct dict_connection_transaction *trans;
+ char chr;
+ int ret;
+
+ if (conn->iter_ctx != NULL) {
+ i_error("dict client: COMMIT: Can't commit while iterating");
+ return -1;
+ }
+
+ if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
+ return -1;
+
+ ret = dict_transaction_commit(&trans->ctx);
+ switch (ret) {
+ case 1:
+ chr = DICT_PROTOCOL_REPLY_OK;
+ break;
+ case 0:
+ chr = DICT_PROTOCOL_REPLY_NOTFOUND;
+ break;
+ default:
+ chr = DICT_PROTOCOL_REPLY_FAIL;
+ break;
+ }
+ o_stream_send_str(conn->output, t_strdup_printf("%c\n", chr));
+ dict_connection_transaction_array_remove(conn, trans);
+ return 0;
+}
+
+static void cmd_commit_async_callback(int ret, void *context)
+{
+ struct dict_connection_transaction *trans = context;
const char *reply;
- int ret;
-
- if (conn->iter_ctx != NULL) {
- i_error("dict client: COMMIT: Can't commit while iterating");
- return -1;
- }
-
- if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
- return -1;
-
- ret = dict_transaction_commit(&trans->ctx);
- reply = t_strdup_printf("%c\n", ret == 0 ? DICT_PROTOCOL_REPLY_OK :
- DICT_PROTOCOL_REPLY_FAIL);
- o_stream_send_str(conn->output, reply);
- dict_connection_transaction_array_remove(conn, trans);
- return 0;
+ char chr;
+
+ switch (ret) {
+ case 1:
+ chr = DICT_PROTOCOL_REPLY_OK;
+ break;
+ case 0:
+ chr = DICT_PROTOCOL_REPLY_NOTFOUND;
+ break;
+ default:
+ chr = DICT_PROTOCOL_REPLY_FAIL;
+ break;
+ }
+ reply = t_strdup_printf("%c%c%u\n", DICT_PROTOCOL_REPLY_ASYNC_COMMIT,
+ chr, trans->id);
+ o_stream_send_str(trans->conn->output, reply);
+
+ dict_connection_transaction_array_remove(trans->conn, trans);
}
static int
@@ -214,8 +249,8 @@ cmd_commit_async(struct dict_connection
if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
return -1;
- dict_transaction_commit_async(&trans->ctx);
- dict_connection_transaction_array_remove(conn, trans);
+ dict_transaction_commit_async(&trans->ctx, cmd_commit_async_callback,
+ trans);
return 0;
}
diff -r 551c273f4844 -r 0f7b25f3e2ce src/dict/dict-connection.h
--- a/src/dict/dict-connection.h Sun Sep 06 20:42:42 2009 -0400
+++ b/src/dict/dict-connection.h Sun Sep 06 20:51:25 2009 -0400
@@ -5,6 +5,7 @@
struct dict_connection_transaction {
unsigned int id;
+ struct dict_connection *conn;
struct dict_transaction_context *ctx;
};
diff -r 551c273f4844 -r 0f7b25f3e2ce src/lib-dict/dict-client.c
--- a/src/lib-dict/dict-client.c Sun Sep 06 20:42:42 2009 -0400
+++ b/src/lib-dict/dict-client.c Sun Sep 06 20:51:25 2009 -0400
@@ -1,6 +1,7 @@
/* Copyright (c) 2005-2009 Dovecot authors, see the included COPYING file */
#include "lib.h"
+#include "llist.h"
#include "str.h"
#include "network.h"
#include "istream.h"
@@ -8,6 +9,7 @@
#include "dict-private.h"
#include "dict-client.h"
+#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
@@ -24,9 +26,13 @@ struct client_dict {
time_t last_connect_try;
struct istream *input;
struct ostream *output;
+ struct io *io;
+
+ struct client_dict_transaction_context *transactions;
unsigned int connect_counter;
unsigned int transaction_id_counter;
+ unsigned int async_commits;
unsigned int in_iteration:1;
unsigned int handshaked:1;
@@ -41,6 +47,11 @@ struct client_dict_iterate_context {
struct client_dict_transaction_context {
struct dict_transaction_context ctx;
+ struct client_dict_transaction_context *prev, *next;
+
+ /* for async commits */
+ dict_transaction_commit_callback_t *callback;
+ void *context;
unsigned int id;
unsigned int connect_counter;
@@ -213,29 +224,97 @@ client_dict_send_transaction_query(struc
return 0;
}
-static char *client_dict_read_line(struct client_dict *dict)
-{
+static struct client_dict_transaction_context *
+client_dict_transaction_find(struct client_dict *dict, unsigned int id)
+{
+ struct client_dict_transaction_context *ctx;
+
+ for (ctx = dict->transactions; ctx != NULL; ctx = ctx->next) {
+ if (ctx->id == id)
+ return ctx;
+ }
+ return NULL;
+}
+
+static void
+client_dict_finish_transaction(struct client_dict *dict,
+ unsigned int id, int ret)
+{
+ struct client_dict_transaction_context *ctx;
+
+ ctx = client_dict_transaction_find(dict, id);
+ if (ctx == NULL) {
+ i_error("dict-client: Unknown transaction id %u", id);
+ return;
+ }
+ if (ctx->callback != NULL)
+ ctx->callback(ret, ctx->context);
+
+ DLLIST_REMOVE(&dict->transactions, ctx);
+ i_free(ctx);
+
+ i_assert(dict->async_commits > 0);
+ if (--dict->async_commits == 0)
+ io_remove(&dict->io);
+}
+
+static int client_dict_read_one_line(struct client_dict *dict, char **line_r)
+{
+ unsigned int id;
char *line;
int ret;
- line = i_stream_next_line(dict->input);
- if (line != NULL)
- return line;
-
- while ((ret = i_stream_read(dict->input)) > 0) {
- line = i_stream_next_line(dict->input);
- if (line != NULL)
- return line;
- }
- i_assert(ret < 0);
-
- if (ret == -2)
- i_error("read(%s) returned too much data", dict->path);
- else if (dict->input->stream_errno == 0)
- i_error("read(%s) failed: Remote disconnected", dict->path);
- else
- i_error("read(%s) failed: %m", dict->path);
- return NULL;
+ *line_r = NULL;
+ while ((line = i_stream_next_line(dict->input)) == NULL) {
+ ret = i_stream_read(dict->input);
+ switch (ret) {
+ case -1:
+ if (dict->input->stream_errno != 0)
+ i_error("read(%s) failed: %m", dict->path);
+ else {
+ i_error("read(%s) failed: Remote disconnected",
+ dict->path);
+ }
+ return -1;
+ case -2:
+ i_error("read(%s) returned too much data", dict->path);
+ return -1;
+ default:
+ i_assert(ret > 0);
+ break;
+ }
+ }
+ if (*line == DICT_PROTOCOL_REPLY_ASYNC_COMMIT) {
+ switch (line[1]) {
+ case DICT_PROTOCOL_REPLY_OK:
+ ret = 1;
+ break;
+ case DICT_PROTOCOL_REPLY_NOTFOUND:
+ ret = 0;
+ break;
+ case DICT_PROTOCOL_REPLY_FAIL:
+ ret = -1;
+ break;
+ default:
+ i_error("dict-client: Invalid async commit line: %s",
+ line);
+ return 0;
+ }
+ id = strtoul(line+2, NULL, 10);
+ client_dict_finish_transaction(dict, id, ret);
+ return 0;
+ }
+ *line_r = line;
+ return 1;
+}
+
+static char *client_dict_read_line(struct client_dict *dict)
+{
+ char *line;
+
+ while (client_dict_read_one_line(dict, &line) == 0)
+ ;
+ return line;
}
static int client_dict_connect(struct client_dict *dict)
@@ -263,6 +342,7 @@ static int client_dict_connect(struct cl
dict->input->blocking = TRUE;
dict->output = o_stream_create_fd(dict->fd, 4096, FALSE);
dict->transaction_id_counter = 0;
+ dict->async_commits = 0;
query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n",
DICT_PROTOCOL_CMD_HELLO,
@@ -283,6 +363,8 @@ static void client_dict_disconnect(struc
dict->connect_counter++;
dict->handshaked = FALSE;
+ if (dict->io != NULL)
+ io_remove(&dict->io);
if (dict->input != NULL)
i_stream_destroy(&dict->input);
if (dict->output != NULL)
@@ -339,6 +421,21 @@ static void client_dict_deinit(struct di
pool_unref(&dict->pool);
}
More information about the dovecot-cvs
mailing list