dovecot-2.2: dsync: Commit large transactions every 100 new mess...

dovecot at dovecot.org dovecot at dovecot.org
Mon Apr 8 18:14:45 EEST 2013


details:   http://hg.dovecot.org/dovecot-2.2/rev/548e59794f2e
changeset: 16255:548e59794f2e
user:      Timo Sirainen <tss at iki.fi>
date:      Mon Apr 08 18:14:32 2013 +0300
description:
dsync: Commit large transactions every 100 new messages.
This way if the dsync crashes or transaction fails in the middle, the next
run can finish faster. Also the rollbacking finishes faster.

diffstat:

 src/doveadm/dsync/dsync-mailbox-import.c |  187 ++++++++++++++++++++----------
 1 files changed, 122 insertions(+), 65 deletions(-)

diffs (truncated from 339 to 300 lines):

diff -r e0acf38f6199 -r 548e59794f2e src/doveadm/dsync/dsync-mailbox-import.c
--- a/src/doveadm/dsync/dsync-mailbox-import.c	Mon Apr 08 17:10:14 2013 +0300
+++ b/src/doveadm/dsync/dsync-mailbox-import.c	Mon Apr 08 18:14:32 2013 +0300
@@ -13,6 +13,8 @@
 #include "dsync-mailbox.h"
 #include "dsync-mailbox-import.h"
 
+#define DSYNC_COMMIT_MSGS_INTERVAL 100
+
 struct importer_mail {
 	const char *guid;
 	uint32_t uid;
@@ -79,6 +81,7 @@
 
 	ARRAY(struct importer_new_mail *) newmails;
 	ARRAY_TYPE(uint32_t) wanted_uids;
+	ARRAY_TYPE(uint32_t) saved_uids;
 	uint32_t highest_wanted_uid;
 
 	ARRAY(struct dsync_mail_request) mail_requests;
@@ -104,6 +107,8 @@
 static void dsync_mailbox_save_newmails(struct dsync_mailbox_importer *importer,
 					const struct dsync_mail *mail,
 					struct importer_new_mail *all_newmails);
+static int dsync_mailbox_import_commit(struct dsync_mailbox_importer *importer,
+				       bool final);
 
 static void
 dsync_mailbox_import_search_init(struct dsync_mailbox_importer *importer)
@@ -128,6 +133,22 @@
 	importer->cur_mail_skip = TRUE;
 }
 
+static void
+dsync_mailbox_import_transaction_begin(struct dsync_mailbox_importer *importer)
+{
+	const enum mailbox_transaction_flags ext_trans_flags =
+		MAILBOX_TRANSACTION_FLAG_SYNC |
+		MAILBOX_TRANSACTION_FLAG_EXTERNAL |
+		MAILBOX_TRANSACTION_FLAG_ASSIGN_UIDS;
+
+	importer->trans = mailbox_transaction_begin(importer->box,
+						MAILBOX_TRANSACTION_FLAG_SYNC);
+	importer->ext_trans = mailbox_transaction_begin(importer->box,
+							ext_trans_flags);
+	importer->mail = mail_alloc(importer->trans, 0, NULL);
+	importer->ext_mail = mail_alloc(importer->ext_trans, 0, NULL);
+}
+
 struct dsync_mailbox_importer *
 dsync_mailbox_import_init(struct mailbox *box,
 			  struct dsync_transaction_log_scan *log_scan,
@@ -140,10 +161,6 @@
 			  uint64_t remote_highest_pvt_modseq,
 			  enum dsync_mailbox_import_flags flags)
 {
-	const enum mailbox_transaction_flags ext_trans_flags =
-		MAILBOX_TRANSACTION_FLAG_SYNC |
-		MAILBOX_TRANSACTION_FLAG_EXTERNAL |
-		MAILBOX_TRANSACTION_FLAG_ASSIGN_UIDS;
 	struct dsync_mailbox_importer *importer;
 	struct mailbox_status status;
 	pool_t pool;
@@ -170,12 +187,9 @@
 	i_array_init(&importer->maybe_saves, 128);
 	i_array_init(&importer->newmails, 128);
 	i_array_init(&importer->wanted_uids, 128);
+	i_array_init(&importer->saved_uids, 128);
 
-	importer->trans = mailbox_transaction_begin(importer->box,
-		MAILBOX_TRANSACTION_FLAG_SYNC);
-	importer->ext_trans = mailbox_transaction_begin(box, ext_trans_flags);
-	importer->mail = mail_alloc(importer->trans, 0, NULL);
-	importer->ext_mail = mail_alloc(importer->ext_trans, 0, NULL);
+	dsync_mailbox_import_transaction_begin(importer);
 
 	if ((flags & DSYNC_MAILBOX_IMPORT_FLAG_WANT_MAIL_REQUESTS) != 0) {
 		i_array_init(&importer->mail_requests, 128);
@@ -1498,12 +1512,21 @@
 }
 
 static void
-dsync_mailbox_import_want_uid(struct dsync_mailbox_importer *importer,
-			      uint32_t uid)
+dsync_mailbox_import_saved_uid(struct dsync_mailbox_importer *importer,
+			       uint32_t uid)
 {
+	i_assert(importer->search_ctx == NULL);
+
 	if (importer->highest_wanted_uid < uid)
 		importer->highest_wanted_uid = uid;
 	array_append(&importer->wanted_uids, &uid, 1);
+
+	/* commit the transaction once in a while, so if we fail we don't
+	   rollback everything. */
+	if (array_count(&importer->wanted_uids) % DSYNC_COMMIT_MSGS_INTERVAL == 0) {
+		if (dsync_mailbox_import_commit(importer, FALSE) < 0)
+			importer->failed = TRUE;
+	}
 }
 
 static bool
@@ -1522,7 +1545,7 @@
 	mailbox_save_set_uid(save_ctx, new_uid);
 	if (mailbox_move(&save_ctx, importer->mail) < 0)
 		return FALSE;
-	dsync_mailbox_import_want_uid(importer, new_uid);
+	dsync_mailbox_import_saved_uid(importer, new_uid);
 	return TRUE;
 }
 
@@ -1737,6 +1760,15 @@
 	/* save mails from local sources where possible,
 	   request the rest from remote */
 	dsync_mailbox_import_handle_local_mails(importer);
+
+	if (importer->search_ctx != NULL) {
+		if (mailbox_search_deinit(&importer->search_ctx) < 0) {
+			i_error("Mailbox %s: Search failed: %s",
+				mailbox_get_vname(importer->box),
+				mailbox_get_last_error(importer->box, NULL));
+			importer->failed = TRUE;
+		}
+	}
 }
 
 const struct dsync_mail_request *
@@ -1872,7 +1904,7 @@
 	}
 	if (ret > 0) {
 		i_assert(save_ctx == NULL);
-		dsync_mailbox_import_want_uid(importer, newmail->final_uid);
+		dsync_mailbox_import_saved_uid(importer, newmail->final_uid);
 		return;
 	}
 	/* fallback to saving from remote stream */
@@ -1920,8 +1952,8 @@
 				mailbox_get_last_error(importer->box, NULL));
 			importer->failed = TRUE;
 		} else {
-			dsync_mailbox_import_want_uid(importer,
-						      newmail->final_uid);
+			dsync_mailbox_import_saved_uid(importer,
+						       newmail->final_uid);
 		}
 	}
 }
@@ -2031,24 +2063,22 @@
 
 static int
 reassign_unwanted_uids(struct dsync_mailbox_importer *importer,
-		       const struct mail_transaction_commit_changes *changes,
 		       bool *changes_during_sync_r)
 {
 	ARRAY_TYPE(seq_range) unwanted_uids;
-	struct seq_range_iter iter;
-	const uint32_t *wanted_uids;
-	uint32_t saved_uid, highest_seen_uid;
-	unsigned int i, n, wanted_count;
+	const uint32_t *wanted_uids, *saved_uids;
+	uint32_t highest_seen_uid;
+	unsigned int i, wanted_count, saved_count;
 	int ret = 0;
 
 	wanted_uids = array_get(&importer->wanted_uids, &wanted_count);
-	if (wanted_count == 0) {
-		i_assert(array_count(&changes->saved_uids) == 0);
+	saved_uids = array_get(&importer->saved_uids, &saved_count);
+	i_assert(wanted_count == saved_count);
+	if (wanted_count == 0)
 		return 0;
-	}
 	/* wanted_uids contains the UIDs we tried to save mails with.
 	   if nothing changed during dsync, we should have the expected UIDs
-	   (changes->saved_uids) and all is well.
+	   (saved_uids) and all is well.
 
 	   if any new messages got inserted during dsync, we'll need to fix up
 	   the UIDs and let the next dsync fix up the other side. for example:
@@ -2069,14 +2099,11 @@
 	i_assert(importer->local_uid_next <= highest_seen_uid);
 	seq_range_array_add_range(&unwanted_uids,
 				  importer->local_uid_next, highest_seen_uid);
-	seq_range_array_iter_init(&iter, &changes->saved_uids); i = n = 0;
-	while (seq_range_array_iter_nth(&iter, n++, &saved_uid)) {
+	for (i = 0; i < wanted_count; i++) {
 		i_assert(i < wanted_count);
-		if (saved_uid == wanted_uids[i])
-			seq_range_array_remove(&unwanted_uids, saved_uid);
-		i++;
+		if (saved_uids[i] == wanted_uids[i])
+			seq_range_array_remove(&unwanted_uids, saved_uids[i]);
 	}
-	i_assert(i == wanted_count);
 
 	ret = reassign_uids_in_seq_range(importer->box, &unwanted_uids);
 	if (ret == 0) {
@@ -2088,12 +2115,17 @@
 	return ret < 0 ? -1 : 0;
 }
 
-static int dsync_mailbox_import_commit(struct dsync_mailbox_importer *importer,
-				       bool *changes_during_sync_r)
+static int
+dsync_mailbox_import_commit(struct dsync_mailbox_importer *importer, bool final)
 {
 	struct mail_transaction_commit_changes changes;
-	struct mailbox_update update;
-	int ret = 0;
+	struct seq_range_iter iter;
+	uint32_t uid;
+	unsigned int n;
+	int ret = importer->failed ? -1 : 0;
+
+	mail_free(&importer->mail);
+	mail_free(&importer->ext_mail);
 
 	/* commit saves */
 	if (mailbox_transaction_commit_get_changes(&importer->ext_trans,
@@ -2101,33 +2133,59 @@
 		i_error("Mailbox %s: Save commit failed: %s",
 			mailbox_get_vname(importer->box),
 			mailbox_get_last_error(importer->box, NULL));
+		/* removed wanted_uids that weren't actually saved */
+		array_delete(&importer->wanted_uids,
+			     array_count(&importer->saved_uids),
+			     array_count(&importer->wanted_uids) -
+			     array_count(&importer->saved_uids));
 		mailbox_transaction_rollback(&importer->trans);
-		return -1;
+		ret = -1;
+	} else {
+		/* remember the UIDs that were successfully saved */
+		seq_range_array_iter_init(&iter, &changes.saved_uids); n = 0;
+		while (seq_range_array_iter_nth(&iter, n++, &uid))
+			array_append(&importer->saved_uids, &uid, 1);
+		pool_unref(&changes.pool);
+
+		/* commit flag changes and expunges */
+		if (mailbox_transaction_commit(&importer->trans) < 0) {
+			i_error("Mailbox %s: Commit failed: %s",
+				mailbox_get_vname(importer->box),
+				mailbox_get_last_error(importer->box, NULL));
+			ret = -1;
+		}
 	}
 
-	/* commit flag changes and expunges */
-	if (mailbox_transaction_commit(&importer->trans) < 0) {
-		i_error("Mailbox %s: Commit failed: %s",
-			mailbox_get_vname(importer->box),
-			mailbox_get_last_error(importer->box, NULL));
-		pool_unref(&changes.pool);
-		return -1;
-	}
+	if (!final)
+		dsync_mailbox_import_transaction_begin(importer);
+	return ret;
+}
 
-	/* update mailbox metadata. */
-	memset(&update, 0, sizeof(update));
-	update.min_next_uid = importer->remote_uid_next;
-	update.min_first_recent_uid =
-		I_MIN(importer->last_common_uid+1,
-		      importer->remote_first_recent_uid);
-	update.min_highest_modseq = importer->remote_highest_modseq;
-	update.min_highest_pvt_modseq = importer->remote_highest_pvt_modseq;
+static int dsync_mailbox_import_finish(struct dsync_mailbox_importer *importer,
+				       bool *changes_during_sync_r)
+{
+	struct mailbox_update update;
+	int ret;
 
-	if (mailbox_update(importer->box, &update) < 0) {
-		i_error("Mailbox %s: Update failed: %s",
-			mailbox_get_vname(importer->box),
-			mailbox_get_last_error(importer->box, NULL));
-		ret = -1;
+	ret = dsync_mailbox_import_commit(importer, TRUE);
+
+	if (ret == 0) {
+		/* update mailbox metadata if we successfully saved
+		   everything. */
+		memset(&update, 0, sizeof(update));
+		update.min_next_uid = importer->remote_uid_next;
+		update.min_first_recent_uid =
+			I_MIN(importer->last_common_uid+1,
+			      importer->remote_first_recent_uid);
+		update.min_highest_modseq = importer->remote_highest_modseq;
+		update.min_highest_pvt_modseq = importer->remote_highest_pvt_modseq;
+
+		if (mailbox_update(importer->box, &update) < 0) {
+			i_error("Mailbox %s: Update failed: %s",
+				mailbox_get_vname(importer->box),
+				mailbox_get_last_error(importer->box, NULL));
+			ret = -1;
+		}
 	}
 


More information about the dovecot-cvs mailing list