dovecot-2.1: lmtp: Simplify LMTP proxying by first reading the w...

dovecot at dovecot.org dovecot at dovecot.org
Sat Dec 10 10:59:46 EET 2011


details:   http://hg.dovecot.org/dovecot-2.1/rev/51d87deb5c26
changeset: 13846:51d87deb5c26
user:      Timo Sirainen <tss at iki.fi>
date:      Sat Dec 10 10:59:30 2011 +0200
description:
lmtp: Simplify LMTP proxying by first reading the whole input to memory/disk.
This hopefully fixes problems related to LMTP proxying, at the cost of
having to write large mails to temp directory.

diffstat:

 src/lmtp/commands.c   |   30 +----
 src/lmtp/lmtp-proxy.c |  258 ++++++-------------------------------------------
 src/lmtp/lmtp-proxy.h |    2 +-
 3 files changed, 42 insertions(+), 248 deletions(-)

diffs (truncated from 439 to 300 lines):

diff -r 9d1336b40592 -r 51d87deb5c26 src/lmtp/commands.c
--- a/src/lmtp/commands.c	Sat Dec 10 08:50:43 2011 +0200
+++ b/src/lmtp/commands.c	Sat Dec 10 10:59:30 2011 +0200
@@ -717,18 +717,12 @@
 		client_input_handle(client);
 }
 
-static void client_proxy_finish(bool timeout, void *context)
+static void client_proxy_finish(void *context)
 {
 	struct client *client = context;
 
 	lmtp_proxy_deinit(&client->proxy);
-	if (timeout) {
-		client_destroy(client,
-			t_strdup_printf("421 4.4.2 %s", client->my_domain),
-			"Disconnected for inactivity");
-	} else {
-		client_input_data_finish(client);
-	}
+	client_input_data_finish(client);
 }
 
 static const char *client_get_added_headers(struct client *client)
@@ -765,10 +759,12 @@
 	struct istream *input;
 	bool ret = TRUE;
 
+	io_remove(&client->io);
 	i_stream_destroy(&client->dot_input);
 
 	input = client_get_input(client);
-	client_input_data_write_local(client, input);
+	if (array_count(&client->state.rcpt_to) != 0)
+		client_input_data_write_local(client, input);
 	if (client->proxy != NULL) {
 		lmtp_proxy_start(client->proxy, input, NULL,
 				 client_proxy_finish, client);
@@ -896,18 +892,8 @@
 	client_send_line(client, "354 OK");
 
 	io_remove(&client->io);
-	if (array_count(&client->state.rcpt_to) == 0) {
-		client->state.name = "DATA (proxy)";
-		timeout_remove(&client->to_idle);
-		lmtp_proxy_start(client->proxy, client->dot_input,
-				 client->state.added_headers,
-				 client_proxy_finish, client);
-		i_stream_unref(&client->dot_input);
-	} else {
-		client->state.name = "DATA";
-		client->io = io_add(client->fd_in, IO_READ,
-				    client_input_data, client);
-		client_input_data_handle(client);
-	}
+	client->state.name = "DATA";
+	client->io = io_add(client->fd_in, IO_READ, client_input_data, client);
+	client_input_data_handle(client);
 	return -1;
 }
diff -r 9d1336b40592 -r 51d87deb5c26 src/lmtp/lmtp-proxy.c
--- a/src/lmtp/lmtp-proxy.c	Sat Dec 10 08:50:43 2011 +0200
+++ b/src/lmtp/lmtp-proxy.c	Sat Dec 10 10:59:30 2011 +0200
@@ -4,13 +4,11 @@
 #include "array.h"
 #include "ioloop.h"
 #include "istream.h"
-#include "istream-tee.h"
 #include "ostream.h"
 #include "lmtp-client.h"
 #include "lmtp-proxy.h"
 
 #define LMTP_MAX_LINE_LEN 1024
-#define LMTP_PROXY_DATA_INPUT_TIMEOUT_MSECS (1000*60)
 
 struct lmtp_proxy_recipient {
 	struct lmtp_proxy_connection *conn;
@@ -27,6 +25,7 @@
 
 	struct lmtp_client *client;
 	struct istream *data_input;
+	struct timeout *to;
 
 	unsigned int finished:1;
 	unsigned int failed:1;
@@ -41,11 +40,9 @@
 	ARRAY_DEFINE(rcpt_to, struct lmtp_proxy_recipient *);
 	unsigned int next_data_reply_idx;
 
-	struct timeout *to, *to_data_idle, *to_finish;
-	struct io *io;
-	struct istream *data_input, *orig_data_input;
+	struct timeout *to_finish;
+	struct istream *data_input;
 	struct ostream *client_output;
-	struct tee_istream *tee_data_input;
 
 	unsigned int max_timeout_msecs;
 
@@ -53,12 +50,9 @@
 	void *finish_context;
 
 	unsigned int finished:1;
-	unsigned int input_timeout:1;
-	unsigned int handling_data_input:1;
 };
 
 static void lmtp_conn_finish(void *context);
-static void lmtp_proxy_data_input(struct lmtp_proxy *proxy);
 
 struct lmtp_proxy *
 lmtp_proxy_init(const char *my_hostname, const char *dns_client_socket_path,
@@ -102,14 +96,8 @@
 		i_stream_unref(&proxy->data_input);
 	if (proxy->client_output != NULL)
 		o_stream_unref(&proxy->client_output);
-	if (proxy->to_data_idle != NULL)
-		timeout_remove(&proxy->to_data_idle);
 	if (proxy->to_finish != NULL)
 		timeout_remove(&proxy->to_finish);
-	if (proxy->to != NULL)
-		timeout_remove(&proxy->to);
-	if (proxy->io != NULL)
-		io_remove(&proxy->io);
 	array_free(&proxy->rcpt_to);
 	array_free(&proxy->connections);
 	pool_unref(&proxy->pool);
@@ -184,11 +172,19 @@
 
 	timeout_remove(&proxy->to_finish);
 	proxy->finished = TRUE;
-	proxy->finish_callback(proxy->input_timeout, proxy->finish_context);
+	proxy->finish_callback(proxy->finish_context);
 }
 
-static void lmtp_proxy_finish(struct lmtp_proxy *proxy)
+static void lmtp_proxy_try_finish(struct lmtp_proxy *proxy)
 {
+	if (proxy->finish_callback == NULL) {
+		/* DATA command hasn't been sent yet */
+		return;
+	}
+	if (!lmtp_proxy_send_data_replies(proxy)) {
+		/* we can't received reply from all clients yet */
+		return;
+	}
 	/* do the actual finishing in a timeout handler, since the finish
 	   callback causes the proxy to be destroyed and the code leading up
 	   to this function can be called from many different places. it's
@@ -200,75 +196,18 @@
 	}
 }
 
-static void lmtp_proxy_try_finish(struct lmtp_proxy *proxy)
-{
-	if (proxy->finish_callback == NULL) {
-		/* DATA command hasn't been sent yet */
-		return;
-	}
-	if (lmtp_proxy_send_data_replies(proxy) &&
-	    (proxy->data_input == NULL ||
-	     proxy->data_input->eof ||
-	     proxy->data_input->stream_errno != 0 ||
-	     proxy->input_timeout))
-		lmtp_proxy_finish(proxy);
-}
-
 static void lmtp_conn_finish(void *context)
 {
 	struct lmtp_proxy_connection *conn = context;
 
 	conn->finished = TRUE;
+	if (conn->to != NULL)
+		timeout_remove(&conn->to);
 	if (conn->data_input != NULL)
 		i_stream_unref(&conn->data_input);
 	lmtp_proxy_try_finish(conn->proxy);
 }
 
-static void lmtp_proxy_fail_all(struct lmtp_proxy *proxy, const char *reason)
-{
-	struct lmtp_proxy_connection *const *conns;
-	unsigned int i, count;
-	const char *line;
-
-	conns = array_get(&proxy->connections, &count);
-	for (i = 0; i < count; i++) {
-		line = t_strdup_printf(ERRSTR_TEMP_REMOTE_FAILURE
-				" (%s while waiting for reply to %s)", reason,
-				lmtp_client_state_to_string(conns[i]->client));
-		lmtp_client_fail(conns[i]->client, line);
-	}
-
-	if (proxy->to_finish == NULL) {
-		/* we still have some DATA input to read */
-		if (proxy->io == NULL) {
-			proxy->io = io_add(i_stream_get_fd(proxy->data_input),
-					   IO_READ,
-					   lmtp_proxy_data_input, proxy);
-		}
-	}
-}
-
-static void lmtp_proxy_data_input_timeout(struct lmtp_proxy *proxy)
-{
-	struct lmtp_proxy_connection *const *conns;
-	unsigned int i, count;
-
-	proxy->input_timeout = TRUE;
-	i_stream_close(proxy->orig_data_input);
-
-	conns = array_get(&proxy->connections, &count);
-	for (i = 0; i < count; i++) {
-		lmtp_client_fail(conns[i]->client, ERRSTR_TEMP_REMOTE_FAILURE
-				 " (timeout in DATA input)");
-	}
-	if (proxy->to_finish == NULL) {
-		/* we had earlier failed all clients already and were just
-		   waiting for DATA input to finish, but DATA input also failed
-		   with a timeout. */
-		lmtp_proxy_finish(proxy);
-	}
-}
-
 static void
 lmtp_proxy_conn_rcpt_to(bool success, const char *reply, void *context)
 {
@@ -316,151 +255,21 @@
 	return 0;
 }
 
-static uoff_t lmtp_proxy_find_lowest_offset(struct lmtp_proxy *proxy)
+static void lmtp_proxy_more_data_sent(void *context)
 {
-	struct lmtp_proxy_connection *const *conns;
-	uoff_t min_offset = (uoff_t)-1;
+	struct lmtp_proxy_connection *conn = context;
 
-	array_foreach(&proxy->connections, conns) {
-		struct lmtp_proxy_connection *conn = *conns;
-
-		if (conn->data_input != NULL &&
-		    min_offset > conn->data_input->v_offset &&
-		    i_stream_have_bytes_left(conn->data_input))
-			min_offset = conn->data_input->v_offset;
-	}
-	return min_offset;
+	lmtp_client_send_more(conn->client);
 }
 
-static bool lmtp_proxy_disconnect_hanging_output(struct lmtp_proxy *proxy)
+static void lmtp_proxy_conn_timeout(struct lmtp_proxy_connection *conn)
 {
-	struct lmtp_proxy_connection *const *conns;
-	uoff_t min_offset;
-	size_t size;
-	const char *errstr;
+	const char *line;
 
-	min_offset = lmtp_proxy_find_lowest_offset(proxy);
-	if (min_offset == (uoff_t)-1)
-		return FALSE;
-
-	/* disconnect all connections that are keeping us from reading
-	   more input. */
-	array_foreach(&proxy->connections, conns) {
-		struct lmtp_proxy_connection *conn = *conns;
-
-		if (conn->data_input != NULL &&
-		    conn->data_input->v_offset == min_offset) {
-			(void)i_stream_get_data(conn->data_input, &size);
-			errstr = t_strdup_printf(ERRSTR_TEMP_REMOTE_FAILURE
-				" (DATA output stalled for %u secs, "
-				"%"PRIuUOFF_T"B sent, %"PRIuSIZE_T"B buffered)",
-				proxy->max_timeout_msecs/1000,
-				min_offset, size);
-			lmtp_client_fail(conn->client, errstr);
-		}
-	}
-	return TRUE;
-}
-
-static void lmtp_proxy_output_timeout(struct lmtp_proxy *proxy)
-{
-	timeout_remove(&proxy->to);
-
-	/* drop the connection with the most unread data */
-	if (lmtp_proxy_disconnect_hanging_output(proxy))
-		lmtp_proxy_data_input(proxy);
-	else {
-		/* no such connection, so we've already sent everything but
-		   some servers aren't replying to us. disconnect all of
-		   them. */
-		i_assert(proxy->data_input->eof);
-		lmtp_proxy_fail_all(proxy, "timeout");
-	}
-}
-
-static void lmtp_proxy_wait_for_output(struct lmtp_proxy *proxy)
-{


More information about the dovecot-cvs mailing list