dovecot-2.1: Initial implementation of dsync-based replication.

dovecot at dovecot.org dovecot at dovecot.org
Sun Mar 4 09:50:33 EET 2012


details:   http://hg.dovecot.org/dovecot-2.1/rev/14ff849dc266
changeset: 14225:14ff849dc266
user:      Timo Sirainen <tss at iki.fi>
date:      Sun Mar 04 09:50:21 2012 +0200
description:
Initial implementation of dsync-based replication.

diffstat:

 .hgignore                                          |    2 +
 configure.in                                       |    4 +
 src/Makefile.am                                    |    1 +
 src/doveadm/dsync/doveadm-dsync.c                  |    1 +
 src/lib-storage/mail-user.h                        |    2 +
 src/plugins/replication/Makefile.am                |   25 +
 src/plugins/replication/replication-plugin.c       |  353 ++++++++++++++++++++
 src/plugins/replication/replication-plugin.h       |    9 +
 src/replication/Makefile.am                        |    4 +
 src/replication/aggregator/Makefile.am             |   26 +
 src/replication/aggregator/aggregator-settings.c   |   85 ++++
 src/replication/aggregator/aggregator-settings.h   |   12 +
 src/replication/aggregator/aggregator.c            |   75 ++++
 src/replication/aggregator/notify-connection.c     |  154 ++++++++
 src/replication/aggregator/notify-connection.h     |    9 +
 src/replication/aggregator/replicator-connection.c |  321 ++++++++++++++++++
 src/replication/aggregator/replicator-connection.h |   25 +
 src/replication/replication-common.h               |   30 +
 src/replication/replicator/Makefile.am             |   30 +
 src/replication/replicator/doveadm-connection.c    |  194 +++++++++++
 src/replication/replicator/doveadm-connection.h    |   20 +
 src/replication/replicator/notify-connection.c     |  197 +++++++++++
 src/replication/replicator/notify-connection.h     |   13 +
 src/replication/replicator/replicator-brain.c      |  164 +++++++++
 src/replication/replicator/replicator-brain.h      |   11 +
 src/replication/replicator/replicator-queue.c      |  363 +++++++++++++++++++++
 src/replication/replicator/replicator-queue.h      |   60 +++
 src/replication/replicator/replicator-settings.c   |   80 ++++
 src/replication/replicator/replicator-settings.h   |   15 +
 src/replication/replicator/replicator.c            |  117 ++++++
 30 files changed, 2402 insertions(+), 0 deletions(-)

diffs (truncated from 2559 to 300 lines):

diff -r a452e5f616a2 -r 14ff849dc266 .hgignore
--- a/.hgignore	Sun Mar 04 09:39:45 2012 +0200
+++ b/.hgignore	Sun Mar 04 09:50:21 2012 +0200
@@ -85,6 +85,8 @@
 src/plugins/fts-squat/squat-test
 src/pop3-login/pop3-login
 src/pop3/pop3
+src/replication/replicator/replicator
+src/replication/aggregator/aggregator
 src/util/gdbhelper
 src/util/listview
 src/util/maildirlock
diff -r a452e5f616a2 -r 14ff849dc266 configure.in
--- a/configure.in	Sun Mar 04 09:39:45 2012 +0200
+++ b/configure.in	Sun Mar 04 09:50:21 2012 +0200
@@ -2785,6 +2785,9 @@
 src/master/Makefile
 src/pop3/Makefile
 src/pop3-login/Makefile
+src/replication/Makefile
+src/replication/aggregator/Makefile
+src/replication/replicator/Makefile
 src/ssl-params/Makefile
 src/stats/Makefile
 src/util/Makefile
@@ -2803,6 +2806,7 @@
 src/plugins/notify/Makefile
 src/plugins/quota/Makefile
 src/plugins/imap-quota/Makefile
+src/plugins/replication/Makefile
 src/plugins/snarf/Makefile
 src/plugins/stats/Makefile
 src/plugins/imap-stats/Makefile
diff -r a452e5f616a2 -r 14ff849dc266 src/Makefile.am
--- a/src/Makefile.am	Sun Mar 04 09:39:45 2012 +0200
+++ b/src/Makefile.am	Sun Mar 04 09:50:21 2012 +0200
@@ -39,6 +39,7 @@
 	log \
 	config \
 	director \
+	replication \
 	util \
 	doveadm \
 	ssl-params \
diff -r a452e5f616a2 -r 14ff849dc266 src/doveadm/dsync/doveadm-dsync.c
--- a/src/doveadm/dsync/doveadm-dsync.c	Sun Mar 04 09:39:45 2012 +0200
+++ b/src/doveadm/dsync/doveadm-dsync.c	Sun Mar 04 09:50:21 2012 +0200
@@ -368,6 +368,7 @@
 	int lock_fd, ret = 0;
 
 	user->admin = TRUE;
+	user->dsyncing = TRUE;
 
 	/* create workers */
 	worker1 = dsync_worker_init_local(user, ctx->namespace_prefix,
diff -r a452e5f616a2 -r 14ff849dc266 src/lib-storage/mail-user.h
--- a/src/lib-storage/mail-user.h	Sun Mar 04 09:39:45 2012 +0200
+++ b/src/lib-storage/mail-user.h	Sun Mar 04 09:50:21 2012 +0200
@@ -56,6 +56,8 @@
 	unsigned int inbox_open_error_logged:1;
 	/* Fuzzy search works for this user (FTS enabled) */
 	unsigned int fuzzy_search:1;
+	/* We're running dsync */
+	unsigned int dsyncing:1;
 };
 
 struct mail_user_module_register {
diff -r a452e5f616a2 -r 14ff849dc266 src/plugins/replication/Makefile.am
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/plugins/replication/Makefile.am	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,25 @@
+AM_CPPFLAGS = \
+	-I$(top_srcdir)/src/lib \
+	-I$(top_srcdir)/src/lib-mail \
+	-I$(top_srcdir)/src/lib-imap \
+	-I$(top_srcdir)/src/lib-index \
+	-I$(top_srcdir)/src/lib-storage \
+	-I$(top_srcdir)/src/replication \
+	-I$(top_srcdir)/src/plugins/notify
+
+NOPLUGIN_LDFLAGS =
+lib20_replication_plugin_la_LDFLAGS = -module -avoid-version
+
+module_LTLIBRARIES = \
+	lib20_replication_plugin.la
+
+if DOVECOT_PLUGIN_DEPS
+lib20_replication_plugin_la_LIBADD = \
+	../notify/lib15_notify_plugin.la
+endif
+
+lib20_replication_plugin_la_SOURCES = \
+	replication-plugin.c
+
+noinst_HEADERS = \
+	replication-plugin.h
diff -r a452e5f616a2 -r 14ff849dc266 src/plugins/replication/replication-plugin.c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/plugins/replication/replication-plugin.c	Sun Mar 04 09:50:21 2012 +0200
@@ -0,0 +1,353 @@
+/* Copyright (c) 2012 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "str.h"
+#include "strescape.h"
+#include "fd-set-nonblock.h"
+#include "ioloop.h"
+#include "network.h"
+#include "write-full.h"
+#include "mail-user.h"
+#include "mail-storage-private.h"
+#include "notify-plugin.h"
+#include "replication-common.h"
+#include "replication-plugin.h"
+
+#include <stdlib.h>
+
+#define REPLICATION_SOCKET_NAME "replication-notify"
+#define REPLICATION_FIFO_NAME "replication-notify-fifo"
+#define REPLICATION_NOTIFY_DELAY_MSECS 500
+#define REPLICATION_SYNC_TIMEOUT_SECS 10
+
+#define REPLICATION_USER_CONTEXT(obj) \
+	MODULE_CONTEXT(obj, replication_user_module)
+
+struct replication_user {
+	union mail_user_module_context module_ctx;
+
+	const char *fifo_path;
+	const char *socket_path;
+
+	int fifo_fd;
+
+	struct timeout *to;
+	enum replication_priority priority;
+	unsigned int sync_secs;
+
+	bool fifo_failed;
+};
+
+struct replication_mail_txn_context {
+	struct mail_user *user;
+	bool new_messages;
+};
+
+static MODULE_CONTEXT_DEFINE_INIT(replication_user_module,
+				  &mail_user_module_register);
+
+static int
+replication_fifo_notify(struct mail_user *user,
+			enum replication_priority priority)
+{
+	struct replication_user *ruser = REPLICATION_USER_CONTEXT(user);
+	string_t *str;
+	ssize_t ret;
+
+	if (ruser->fifo_failed)
+		return -1;
+	if (ruser->fifo_fd == -1) {
+		ruser->fifo_fd = open(ruser->fifo_path, O_WRONLY);
+		if (ruser->fifo_fd == -1) {
+			i_error("open(%s) failed: %m", ruser->fifo_path);
+			ruser->fifo_failed = TRUE;
+			return -1;
+		}
+		fd_set_nonblock(ruser->fifo_fd, TRUE);
+	}
+	/* <username> \t <priority> */
+	str = t_str_new(256);
+	str_tabescape_write(str, user->username);
+	str_append_c(str, '\t');
+	switch (priority) {
+	case REPLICATION_PRIORITY_NONE:
+	case REPLICATION_PRIORITY_SYNC:
+		i_unreached();
+	case REPLICATION_PRIORITY_LOW:
+		str_append(str, "low");
+		break;
+	case REPLICATION_PRIORITY_HIGH:
+		str_append(str, "high");
+		break;
+	}
+	str_append_c(str, '\n');
+	ret = write(ruser->fifo_fd, str_data(str), str_len(str));
+	if (ret == 0) {
+		/* busy, try again later */
+		return 0;
+	}
+	if (ret != (ssize_t)str_len(str)) {
+		if (ret < 0)
+			i_error("write(%s) failed: %m", ruser->fifo_path);
+		else {
+			i_error("write(%s) wrote partial data",
+				ruser->fifo_path);
+		}
+		if (close(ruser->fifo_fd) < 0)
+			i_error("close(%s) failed: %m", ruser->fifo_path);
+		ruser->fifo_fd = -1;
+		return -1;
+	}
+	return 1;
+}
+
+static void replication_notify_now(struct mail_user *user)
+{
+	struct replication_user *ruser = REPLICATION_USER_CONTEXT(user);
+	int ret;
+
+	i_assert(ruser->priority != REPLICATION_PRIORITY_NONE);
+	i_assert(ruser->priority != REPLICATION_PRIORITY_SYNC);
+
+	if ((ret = replication_fifo_notify(user, ruser->priority)) < 0 &&
+	    !ruser->fifo_failed) {
+		/* retry once, in case replication server was restarted */
+		ret = replication_fifo_notify(user, ruser->priority);
+	}
+	if (ret != 0) {
+		timeout_remove(&ruser->to);
+		ruser->priority = REPLICATION_PRIORITY_NONE;
+	}
+}
+
+static int replication_notify_sync(struct mail_user *user)
+{
+	struct replication_user *ruser = REPLICATION_USER_CONTEXT(user);
+	string_t *str;
+	char buf[1024];
+	int fd;
+	ssize_t ret;
+
+	fd = net_connect_unix(ruser->socket_path);
+	if (fd == -1) {
+		i_error("net_connect_unix(%s) failed: %m", ruser->socket_path);
+		return -1;
+	}
+	net_set_nonblock(fd, FALSE);
+
+	/* <username> \t "sync" */
+	str = t_str_new(256);
+	str_tabescape_write(str, user->username);
+	str_append(str, "\tsync\n");
+	alarm(ruser->sync_secs);
+	if (write_full(fd, str_data(str), str_len(str)) < 0) {
+		i_error("write(%s) failed: %m", ruser->socket_path);
+		ret = -1;
+	} else {
+		/* + | - */
+		ret = read(fd, buf, sizeof(buf));
+		if (ret < 0) {
+			if (ret != EINTR) {
+				i_error("read(%s) failed: %m",
+					ruser->socket_path);
+			} else {
+				i_warning("replication(%s): Sync failure: "
+					  "Timeout in %u secs",
+					  user->username, ruser->sync_secs);
+			}
+		} else if (ret == 0) {
+			i_error("read(%s) failed: EOF", ruser->socket_path);
+			ret = -1;
+		} else if (buf[0] == '+') {
+			/* success */
+			ret = 0;
+		} else if (buf[0] == '-') {
+			/* failure */
+			if (buf[ret-1] == '\n') ret--;
+			i_warning("replication(%s): Sync failure: %s",
+				  user->username, t_strndup(buf+1, ret-1));
+			ret = -1;
+		} else {
+			i_warning("replication(%s): "
+				  "Remote sent invalid input: %s",
+				  user->username, t_strndup(buf, ret));
+		}
+	}
+	alarm(0);
+	if (close(fd) < 0)
+		i_error("close(%s) failed: %m", ruser->socket_path);
+	return ret;
+}
+
+static void replication_notify(struct mail_user *user,
+			       enum replication_priority priority)
+{
+	struct replication_user *ruser = REPLICATION_USER_CONTEXT(user);
+
+	if (user->dsyncing) {
+		/* we're running dsync, which means that the remote is telling
+		   us about a change. don't trigger a replication back to it */
+		return;
+	}
+
+	if (priority == REPLICATION_PRIORITY_SYNC) {
+		if (replication_notify_sync(user) == 0) {
+			timeout_remove(&ruser->to);
+			ruser->priority = REPLICATION_PRIORITY_NONE;
+			return;
+		}
+		/* sync replication failed, try as "high" via fifo */


More information about the dovecot-cvs mailing list