dovecot-2.0: dsync: Lots of updates and fixes.

dovecot at dovecot.org dovecot at dovecot.org
Thu Aug 6 03:30:54 EEST 2009


details:   http://hg.dovecot.org/dovecot-2.0/rev/d9a96da46d4a
changeset: 9736:d9a96da46d4a
user:      Timo Sirainen <tss at iki.fi>
date:      Wed Aug 05 20:30:43 2009 -0400
description:
dsync: Lots of updates and fixes.

diffstat:

25 files changed, 1449 insertions(+), 466 deletions(-)
src/dsync/dsync-brain-msgs-new.c        |  299 +++++++++++-------
src/dsync/dsync-brain-msgs.c            |   47 +-
src/dsync/dsync-brain-private.h         |   14 
src/dsync/dsync-brain.c                 |  237 ++++++++++----
src/dsync/dsync-brain.h                 |   10 
src/dsync/dsync-data.c                  |   28 -
src/dsync/dsync-data.h                  |   20 +
src/dsync/dsync-proxy-client.c          |  167 ++++++++--
src/dsync/dsync-proxy-server-cmd.c      |  134 +++++---
src/dsync/dsync-proxy-server.c          |    4 
src/dsync/dsync-proxy-server.h          |    2 
src/dsync/dsync-proxy.c                 |   92 +++--
src/dsync/dsync-proxy.h                 |    3 
src/dsync/dsync-worker-local.c          |  497 +++++++++++++++++++++++++++----
src/dsync/dsync-worker-private.h        |   10 
src/dsync/dsync-worker.c                |   41 ++
src/dsync/dsync-worker.h                |   21 +
src/dsync/dsync.c                       |   40 +-
src/dsync/test-dsync-brain-msgs.c       |   20 -
src/dsync/test-dsync-brain.c            |   78 +++-
src/dsync/test-dsync-common.c           |   11 
src/dsync/test-dsync-proxy-server-cmd.c |   74 ++++
src/dsync/test-dsync-proxy.c            |   10 
src/dsync/test-dsync-worker.c           |   54 +++
src/dsync/test-dsync-worker.h           |    2 

diffs (truncated from 3395 to 300 lines):

diff -r d354dc450c63 -r d9a96da46d4a src/dsync/dsync-brain-msgs-new.c
--- a/src/dsync/dsync-brain-msgs-new.c	Wed Aug 05 20:30:06 2009 -0400
+++ b/src/dsync/dsync-brain-msgs-new.c	Wed Aug 05 20:30:43 2009 -0400
@@ -2,6 +2,7 @@
 
 #include "lib.h"
 #include "array.h"
+#include "istream.h"
 #include "hash.h"
 #include "dsync-worker.h"
 #include "dsync-brain-private.h"
@@ -13,42 +14,36 @@ struct dsync_brain_msg_copy_context {
 
 struct dsync_brain_msg_save_context {
 	struct dsync_brain_msg_iter *iter;
-
-	mailbox_guid_t mailbox;
 	const struct dsync_message *msg;
 };
 
 static void
-dsync_brain_msg_sync_retry_copies(struct dsync_brain_mailbox_sync *sync);
-
-static bool
-dsync_brain_msg_sync_is_save_done(struct dsync_brain_mailbox_sync *sync)
-{
-	return sync->src_msg_iter->copy_results_left == 0 &&
-		sync->dest_msg_iter->copy_results_left == 0 &&
-		sync->src_msg_iter->save_results_left == 0 &&
-		sync->dest_msg_iter->save_results_left == 0;
-}
+dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter);
 
 static void msg_get_callback(enum dsync_msg_get_result result,
-			     struct dsync_msg_static_data *data,
+			     const struct dsync_msg_static_data *data,
 			     void *context)
 {
 	struct dsync_brain_msg_save_context *ctx = context;
+	struct istream *input;
 
 	switch (result) {
 	case DSYNC_MSG_GET_RESULT_SUCCESS:
-		dsync_worker_select_mailbox(ctx->iter->worker, &ctx->mailbox);
+		input = data->input;
 		dsync_worker_msg_save(ctx->iter->worker, ctx->msg, data);
+		i_stream_unref(&input);
 		break;
 	case DSYNC_MSG_GET_RESULT_EXPUNGED:
 		/* mail got expunged during sync. just skip this. */
 		break;
 	case DSYNC_MSG_GET_RESULT_FAILED:
+		i_error("msg-get failed: uid=%u guid=%s",
+			ctx->msg->uid, ctx->msg->guid);
 		dsync_brain_fail(ctx->iter->sync->brain);
 		break;
 	}
-	ctx->iter->save_results_left--;
+	if (--ctx->iter->save_results_left == 0 && !ctx->iter->adding_msgs)
+		dsync_brain_msg_sync_add_new_msgs(ctx->iter);
 }
 
 static void dsync_brain_copy_callback(bool success, void *context)
@@ -57,7 +52,6 @@ static void dsync_brain_copy_callback(bo
 	const struct dsync_brain_new_msg *msg;
 	struct dsync_brain_guid_instance *inst;
 
-	ctx->iter->copy_results_left--;
 	if (!success) {
 		/* mark the guid instance invalid and try again later */
 		msg = array_idx(&ctx->iter->new_msgs, ctx->msg_idx);
@@ -66,17 +60,15 @@ static void dsync_brain_copy_callback(bo
 		array_append(&ctx->iter->copy_retry_indexes, &ctx->msg_idx, 1);
 	}
 
-	if (dsync_brain_msg_sync_is_save_done(ctx->iter->sync)) {
-		ctx->iter->sync->brain->state++;
-		dsync_brain_sync(ctx->iter->sync->brain);
-	}
+	if (--ctx->iter->copy_results_left == 0 && !ctx->iter->adding_msgs)
+		dsync_brain_msg_sync_add_new_msgs(ctx->iter);
 }
 
 static int
 dsync_brain_msg_sync_add_new_msg(struct dsync_brain_msg_iter *dest_iter,
 				 const mailbox_guid_t *src_mailbox,
 				 unsigned int msg_idx,
-				 const struct dsync_message *msg)
+				 const struct dsync_brain_new_msg *msg)
 {
 	struct dsync_brain_msg_save_context *save_ctx;
 	struct dsync_brain_msg_copy_context *copy_ctx;
@@ -84,10 +76,9 @@ dsync_brain_msg_sync_add_new_msg(struct 
 	const struct dsync_brain_guid_instance *inst;
 	const struct dsync_brain_mailbox *inst_box;
 
-	inst = hash_table_lookup(dest_iter->guid_hash, msg->guid);
+	inst = hash_table_lookup(dest_iter->guid_hash, msg->msg->guid);
 	if (inst != NULL) {
 		/* we can save this by copying an existing message */
-		dsync_worker_select_mailbox(dest_iter->worker, src_mailbox);
 		inst_box = array_idx(&dest_iter->sync->mailboxes,
 				     inst->mailbox_idx);
 
@@ -96,9 +87,10 @@ dsync_brain_msg_sync_add_new_msg(struct 
 		copy_ctx->iter = dest_iter;
 		copy_ctx->msg_idx = msg_idx;
 
-		dsync_worker_msg_copy(dest_iter->worker, &inst_box->box.guid,
-				      inst->uid, msg, dsync_brain_copy_callback,
-				      copy_ctx);
+		dsync_worker_msg_copy(dest_iter->worker,
+				      &inst_box->box.mailbox_guid,
+				      inst->uid, msg->msg,
+				      dsync_brain_copy_callback, copy_ctx);
 		dest_iter->copy_results_left++;
 	} else {
 		src_iter = dest_iter == dest_iter->sync->dest_msg_iter ?
@@ -108,79 +100,78 @@ dsync_brain_msg_sync_add_new_msg(struct 
 		save_ctx = p_new(src_iter->sync->pool,
 				 struct dsync_brain_msg_save_context, 1);
 		save_ctx->iter = dest_iter;
-		save_ctx->mailbox = *src_mailbox;
-		save_ctx->msg = dsync_message_dup(src_iter->sync->pool, msg);
-
-		dsync_worker_select_mailbox(src_iter->worker, src_mailbox);
-		dsync_worker_msg_get(src_iter->worker, msg->uid,
-				     msg_get_callback, save_ctx);
+		save_ctx->msg = dsync_message_dup(src_iter->sync->pool,
+						  msg->msg);
+
+		dest_iter->adding_msgs = TRUE;
 		dest_iter->save_results_left++;
+		dsync_worker_msg_get(src_iter->worker, src_mailbox,
+				     msg->orig_uid, msg_get_callback, save_ctx);
+		dest_iter->adding_msgs = FALSE;
+		if (dsync_worker_output_flush(src_iter->worker) < 0)
+			return -1;
 	}
 	return dsync_worker_is_output_full(dest_iter->worker) ? 0 : 1;
 }
 
-static void
-dsync_brain_msg_iter_add_new_msgs(struct dsync_brain_msg_iter *dest_iter)
-{
-	const struct dsync_brain_mailbox *mailboxes, *mailbox;
+static bool
+dsync_brain_mailbox_add_new_msgs(struct dsync_brain_msg_iter *iter,
+				 const mailbox_guid_t *mailbox_guid)
+{
 	const struct dsync_brain_new_msg *msgs;
-	unsigned int i, mailbox_count, msg_count;
-
-	mailboxes = array_get(&dest_iter->sync->mailboxes, &mailbox_count);
-	msgs = array_get(&dest_iter->new_msgs, &msg_count);
-	for (i = dest_iter->next_new_msg; i < msg_count; i++) {
-		mailbox = &mailboxes[msgs[i].mailbox_idx];
-		if (dsync_brain_msg_sync_add_new_msg(dest_iter,
-						     &mailbox->box.guid, i,
-						     msgs[i].msg) <= 0) {
+	unsigned int i, msg_count;
+	bool ret = TRUE;
+
+	msgs = array_get(&iter->new_msgs, &msg_count);
+	for (i = iter->next_new_msg; i < msg_count; i++) {
+		if (msgs[i].mailbox_idx != iter->mailbox_idx) {
+			ret = FALSE;
+			break;
+		}
+		if (dsync_brain_msg_sync_add_new_msg(iter, mailbox_guid,
+						     i, &msgs[i]) <= 0) {
 			/* failed / continue later */
-			dest_iter->next_new_msg = i + 1;
+			i++;
 			break;
 		}
 	}
-	dest_iter->msgs_sent = TRUE;
-}
-
-static void
-dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter)
-{
-	dsync_brain_msg_iter_add_new_msgs(iter);
-
-	if (iter->sync->dest_msg_iter->msgs_sent &&
-	    iter->sync->src_msg_iter->msgs_sent &&
-	    dsync_brain_msg_sync_is_save_done(iter->sync))
-		dsync_brain_msg_sync_retry_copies(iter->sync);
-}
-
-static void dsync_worker_new_msg_output(void *context)
-{
-	struct dsync_brain_msg_iter *iter = context;
-
-	dsync_brain_msg_sync_add_new_msgs(iter);
-}
-
-static void
-dsync_brain_msg_iter_sync_new_msgs(struct dsync_brain_msg_iter *iter)
-{
-	dsync_worker_set_input_callback(iter->worker, NULL, iter);
-	dsync_worker_set_output_callback(iter->worker,
-					 dsync_worker_new_msg_output, iter);
-	dsync_brain_msg_sync_add_new_msgs(iter);
-}
-
-void dsync_brain_msg_sync_new_msgs(struct dsync_brain_mailbox_sync *sync)
-{
-	dsync_brain_msg_iter_sync_new_msgs(sync->src_msg_iter);
-	dsync_brain_msg_iter_sync_new_msgs(sync->dest_msg_iter);
-}
-
-static void
-dsync_brain_msg_iter_sync_retry_copies(struct dsync_brain_msg_iter *iter)
+	iter->next_new_msg = i;
+	if (i == msg_count)
+		ret = FALSE;
+
+	/* flush copy commands */
+	if (dsync_worker_output_flush(iter->worker) > 0 && ret) {
+		/* we have more space again, continue */
+		return dsync_brain_mailbox_add_new_msgs(iter, mailbox_guid);
+	} else {
+		return ret;
+	}
+}
+
+static void
+dsync_brain_mailbox_save_conflicts(struct dsync_brain_msg_iter *iter)
+{
+	const struct dsync_brain_uid_conflict *conflicts;
+	unsigned int i, count;
+
+	conflicts = array_get(&iter->uid_conflicts, &count);
+	for (i = iter->next_conflict; i < count; i++) {
+		if (conflicts[i].mailbox_idx != iter->mailbox_idx)
+			break;
+
+		dsync_worker_msg_update_uid(iter->worker, conflicts[i].old_uid,
+					    conflicts[i].new_uid);
+	}
+	iter->next_conflict = i;
+}
+
+static void
+dsync_brain_mailbox_retry_copies(struct dsync_brain_msg_iter *iter,
+				 const mailbox_guid_t *mailbox_guid)
 {
 	const uint32_t *indexes;
-	const struct dsync_brain_mailbox *mailboxes, *mailbox;
 	const struct dsync_brain_new_msg *msgs;
-	unsigned int i, msg_idx, idx_count, msg_count, mailbox_count;
+	unsigned int i, msg_idx, idx_count, msg_count;
 	struct dsync_brain_guid_instance *inst;
 	const char *guid_str;
 	void *orig_key, *orig_value;
@@ -208,37 +199,120 @@ dsync_brain_msg_iter_sync_retry_copies(s
 
 	/* try saving again. there probably weren't many of them, so don't
 	   worry about filling output buffer. */
-	mailboxes = array_get(&iter->sync->mailboxes, &mailbox_count);
 	for (i = 0; i < idx_count; i++) {
 		msg_idx = indexes[i];
-		mailbox = &mailboxes[msgs[msg_idx].mailbox_idx];
-		(void)dsync_brain_msg_sync_add_new_msg(iter, &mailbox->box.guid,
-						       msg_idx,
-						       msgs[msg_idx].msg);
+		(void)dsync_brain_msg_sync_add_new_msg(iter, mailbox_guid,
+						       msg_idx, &msgs[msg_idx]);
 	}
 
 	/* if we copied anything, we'll again have to wait for the results */
 	array_clear(&iter->copy_retry_indexes);
-	dsync_worker_set_output_callback(iter->worker, NULL, NULL);
-}
-
-static void
-dsync_brain_msg_sync_retry_copies(struct dsync_brain_mailbox_sync *sync)
-{
-	dsync_brain_msg_iter_sync_retry_copies(sync->dest_msg_iter);
-	dsync_brain_msg_iter_sync_retry_copies(sync->src_msg_iter);
-
-	if (dsync_brain_msg_sync_is_save_done(sync)) {
-		dsync_worker_set_input_callback(sync->src_worker, NULL, NULL);
-		dsync_worker_set_input_callback(sync->dest_worker, NULL, NULL);
-		sync->brain->state++;
-		dsync_brain_sync(sync->brain);
-	} else {
-		/* temporarily move back the state. once copies have returned
-		   success/failures, we'll get back to this function and see
-		   if we need to retry again */
-		sync->brain->state--;
-	}
+}
+
+static void
+dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter)
+{
+	const struct dsync_brain_mailbox *mailbox;


More information about the dovecot-cvs mailing list