dovecot-2.1: lmtp: Simplify LMTP proxying by first reading the w...
dovecot at dovecot.org
dovecot at dovecot.org
Sat Dec 10 10:59:46 EET 2011
details: http://hg.dovecot.org/dovecot-2.1/rev/51d87deb5c26
changeset: 13846:51d87deb5c26
user: Timo Sirainen <tss at iki.fi>
date: Sat Dec 10 10:59:30 2011 +0200
description:
lmtp: Simplify LMTP proxying by first reading the whole input to memory/disk.
This hopefully fixes problems related to LMTP proxying, at the cost of
having to write large mails to temp directory.
diffstat:
src/lmtp/commands.c | 30 +----
src/lmtp/lmtp-proxy.c | 258 ++++++-------------------------------------------
src/lmtp/lmtp-proxy.h | 2 +-
3 files changed, 42 insertions(+), 248 deletions(-)
diffs (truncated from 439 to 300 lines):
diff -r 9d1336b40592 -r 51d87deb5c26 src/lmtp/commands.c
--- a/src/lmtp/commands.c Sat Dec 10 08:50:43 2011 +0200
+++ b/src/lmtp/commands.c Sat Dec 10 10:59:30 2011 +0200
@@ -717,18 +717,12 @@
client_input_handle(client);
}
-static void client_proxy_finish(bool timeout, void *context)
+static void client_proxy_finish(void *context)
{
struct client *client = context;
lmtp_proxy_deinit(&client->proxy);
- if (timeout) {
- client_destroy(client,
- t_strdup_printf("421 4.4.2 %s", client->my_domain),
- "Disconnected for inactivity");
- } else {
- client_input_data_finish(client);
- }
+ client_input_data_finish(client);
}
static const char *client_get_added_headers(struct client *client)
@@ -765,10 +759,12 @@
struct istream *input;
bool ret = TRUE;
+ io_remove(&client->io);
i_stream_destroy(&client->dot_input);
input = client_get_input(client);
- client_input_data_write_local(client, input);
+ if (array_count(&client->state.rcpt_to) != 0)
+ client_input_data_write_local(client, input);
if (client->proxy != NULL) {
lmtp_proxy_start(client->proxy, input, NULL,
client_proxy_finish, client);
@@ -896,18 +892,8 @@
client_send_line(client, "354 OK");
io_remove(&client->io);
- if (array_count(&client->state.rcpt_to) == 0) {
- client->state.name = "DATA (proxy)";
- timeout_remove(&client->to_idle);
- lmtp_proxy_start(client->proxy, client->dot_input,
- client->state.added_headers,
- client_proxy_finish, client);
- i_stream_unref(&client->dot_input);
- } else {
- client->state.name = "DATA";
- client->io = io_add(client->fd_in, IO_READ,
- client_input_data, client);
- client_input_data_handle(client);
- }
+ client->state.name = "DATA";
+ client->io = io_add(client->fd_in, IO_READ, client_input_data, client);
+ client_input_data_handle(client);
return -1;
}
diff -r 9d1336b40592 -r 51d87deb5c26 src/lmtp/lmtp-proxy.c
--- a/src/lmtp/lmtp-proxy.c Sat Dec 10 08:50:43 2011 +0200
+++ b/src/lmtp/lmtp-proxy.c Sat Dec 10 10:59:30 2011 +0200
@@ -4,13 +4,11 @@
#include "array.h"
#include "ioloop.h"
#include "istream.h"
-#include "istream-tee.h"
#include "ostream.h"
#include "lmtp-client.h"
#include "lmtp-proxy.h"
#define LMTP_MAX_LINE_LEN 1024
-#define LMTP_PROXY_DATA_INPUT_TIMEOUT_MSECS (1000*60)
struct lmtp_proxy_recipient {
struct lmtp_proxy_connection *conn;
@@ -27,6 +25,7 @@
struct lmtp_client *client;
struct istream *data_input;
+ struct timeout *to;
unsigned int finished:1;
unsigned int failed:1;
@@ -41,11 +40,9 @@
ARRAY_DEFINE(rcpt_to, struct lmtp_proxy_recipient *);
unsigned int next_data_reply_idx;
- struct timeout *to, *to_data_idle, *to_finish;
- struct io *io;
- struct istream *data_input, *orig_data_input;
+ struct timeout *to_finish;
+ struct istream *data_input;
struct ostream *client_output;
- struct tee_istream *tee_data_input;
unsigned int max_timeout_msecs;
@@ -53,12 +50,9 @@
void *finish_context;
unsigned int finished:1;
- unsigned int input_timeout:1;
- unsigned int handling_data_input:1;
};
static void lmtp_conn_finish(void *context);
-static void lmtp_proxy_data_input(struct lmtp_proxy *proxy);
struct lmtp_proxy *
lmtp_proxy_init(const char *my_hostname, const char *dns_client_socket_path,
@@ -102,14 +96,8 @@
i_stream_unref(&proxy->data_input);
if (proxy->client_output != NULL)
o_stream_unref(&proxy->client_output);
- if (proxy->to_data_idle != NULL)
- timeout_remove(&proxy->to_data_idle);
if (proxy->to_finish != NULL)
timeout_remove(&proxy->to_finish);
- if (proxy->to != NULL)
- timeout_remove(&proxy->to);
- if (proxy->io != NULL)
- io_remove(&proxy->io);
array_free(&proxy->rcpt_to);
array_free(&proxy->connections);
pool_unref(&proxy->pool);
@@ -184,11 +172,19 @@
timeout_remove(&proxy->to_finish);
proxy->finished = TRUE;
- proxy->finish_callback(proxy->input_timeout, proxy->finish_context);
+ proxy->finish_callback(proxy->finish_context);
}
-static void lmtp_proxy_finish(struct lmtp_proxy *proxy)
+static void lmtp_proxy_try_finish(struct lmtp_proxy *proxy)
{
+ if (proxy->finish_callback == NULL) {
+ /* DATA command hasn't been sent yet */
+ return;
+ }
+ if (!lmtp_proxy_send_data_replies(proxy)) {
+ /* we can't received reply from all clients yet */
+ return;
+ }
/* do the actual finishing in a timeout handler, since the finish
callback causes the proxy to be destroyed and the code leading up
to this function can be called from many different places. it's
@@ -200,75 +196,18 @@
}
}
-static void lmtp_proxy_try_finish(struct lmtp_proxy *proxy)
-{
- if (proxy->finish_callback == NULL) {
- /* DATA command hasn't been sent yet */
- return;
- }
- if (lmtp_proxy_send_data_replies(proxy) &&
- (proxy->data_input == NULL ||
- proxy->data_input->eof ||
- proxy->data_input->stream_errno != 0 ||
- proxy->input_timeout))
- lmtp_proxy_finish(proxy);
-}
-
static void lmtp_conn_finish(void *context)
{
struct lmtp_proxy_connection *conn = context;
conn->finished = TRUE;
+ if (conn->to != NULL)
+ timeout_remove(&conn->to);
if (conn->data_input != NULL)
i_stream_unref(&conn->data_input);
lmtp_proxy_try_finish(conn->proxy);
}
-static void lmtp_proxy_fail_all(struct lmtp_proxy *proxy, const char *reason)
-{
- struct lmtp_proxy_connection *const *conns;
- unsigned int i, count;
- const char *line;
-
- conns = array_get(&proxy->connections, &count);
- for (i = 0; i < count; i++) {
- line = t_strdup_printf(ERRSTR_TEMP_REMOTE_FAILURE
- " (%s while waiting for reply to %s)", reason,
- lmtp_client_state_to_string(conns[i]->client));
- lmtp_client_fail(conns[i]->client, line);
- }
-
- if (proxy->to_finish == NULL) {
- /* we still have some DATA input to read */
- if (proxy->io == NULL) {
- proxy->io = io_add(i_stream_get_fd(proxy->data_input),
- IO_READ,
- lmtp_proxy_data_input, proxy);
- }
- }
-}
-
-static void lmtp_proxy_data_input_timeout(struct lmtp_proxy *proxy)
-{
- struct lmtp_proxy_connection *const *conns;
- unsigned int i, count;
-
- proxy->input_timeout = TRUE;
- i_stream_close(proxy->orig_data_input);
-
- conns = array_get(&proxy->connections, &count);
- for (i = 0; i < count; i++) {
- lmtp_client_fail(conns[i]->client, ERRSTR_TEMP_REMOTE_FAILURE
- " (timeout in DATA input)");
- }
- if (proxy->to_finish == NULL) {
- /* we had earlier failed all clients already and were just
- waiting for DATA input to finish, but DATA input also failed
- with a timeout. */
- lmtp_proxy_finish(proxy);
- }
-}
-
static void
lmtp_proxy_conn_rcpt_to(bool success, const char *reply, void *context)
{
@@ -316,151 +255,21 @@
return 0;
}
-static uoff_t lmtp_proxy_find_lowest_offset(struct lmtp_proxy *proxy)
+static void lmtp_proxy_more_data_sent(void *context)
{
- struct lmtp_proxy_connection *const *conns;
- uoff_t min_offset = (uoff_t)-1;
+ struct lmtp_proxy_connection *conn = context;
- array_foreach(&proxy->connections, conns) {
- struct lmtp_proxy_connection *conn = *conns;
-
- if (conn->data_input != NULL &&
- min_offset > conn->data_input->v_offset &&
- i_stream_have_bytes_left(conn->data_input))
- min_offset = conn->data_input->v_offset;
- }
- return min_offset;
+ lmtp_client_send_more(conn->client);
}
-static bool lmtp_proxy_disconnect_hanging_output(struct lmtp_proxy *proxy)
+static void lmtp_proxy_conn_timeout(struct lmtp_proxy_connection *conn)
{
- struct lmtp_proxy_connection *const *conns;
- uoff_t min_offset;
- size_t size;
- const char *errstr;
+ const char *line;
- min_offset = lmtp_proxy_find_lowest_offset(proxy);
- if (min_offset == (uoff_t)-1)
- return FALSE;
-
- /* disconnect all connections that are keeping us from reading
- more input. */
- array_foreach(&proxy->connections, conns) {
- struct lmtp_proxy_connection *conn = *conns;
-
- if (conn->data_input != NULL &&
- conn->data_input->v_offset == min_offset) {
- (void)i_stream_get_data(conn->data_input, &size);
- errstr = t_strdup_printf(ERRSTR_TEMP_REMOTE_FAILURE
- " (DATA output stalled for %u secs, "
- "%"PRIuUOFF_T"B sent, %"PRIuSIZE_T"B buffered)",
- proxy->max_timeout_msecs/1000,
- min_offset, size);
- lmtp_client_fail(conn->client, errstr);
- }
- }
- return TRUE;
-}
-
-static void lmtp_proxy_output_timeout(struct lmtp_proxy *proxy)
-{
- timeout_remove(&proxy->to);
-
- /* drop the connection with the most unread data */
- if (lmtp_proxy_disconnect_hanging_output(proxy))
- lmtp_proxy_data_input(proxy);
- else {
- /* no such connection, so we've already sent everything but
- some servers aren't replying to us. disconnect all of
- them. */
- i_assert(proxy->data_input->eof);
- lmtp_proxy_fail_all(proxy, "timeout");
- }
-}
-
-static void lmtp_proxy_wait_for_output(struct lmtp_proxy *proxy)
-{
More information about the dovecot-cvs
mailing list