[dovecot-cvs] dovecot/src/pop3 client.c, 1.28, 1.29 client.h, 1.5, 1.6 commands.c, 1.19, 1.20

cras at dovecot.org cras at dovecot.org
Sun Aug 15 06:40:34 EEST 2004


Update of /home/cvs/dovecot/src/pop3
In directory talvi:/tmp/cvs-serv20173/pop3

Modified Files:
	client.c client.h commands.c 
Log Message:
We never do blocking reads/writes to network anymore. Changed imap and pop3
processes to use a single I/O loop.

Not much tested yet, and currently LIST/LSUB may eat too much memory and
APPEND eats all CPU.



Index: client.c
===================================================================
RCS file: /home/cvs/dovecot/src/pop3/client.c,v
retrieving revision 1.28
retrieving revision 1.29
diff -u -d -r1.28 -r1.29
--- client.c	12 Jul 2004 11:35:51 -0000	1.28
+++ client.c	15 Aug 2004 03:40:32 -0000	1.29
@@ -5,6 +5,7 @@
 #include "network.h"
 #include "istream.h"
 #include "ostream.h"
+#include "str.h"
 #include "mail-storage.h"
 #include "commands.h"
 #include "mail-search.h"
@@ -14,7 +15,11 @@
 /* max. length of input command line (spec says 512) */
 #define MAX_INBUF_SIZE 2048
 
-/* If we can't send a buffer in a minute, disconnect the client */
+/* Stop reading input when output buffer has this many bytes. Once the buffer
+   size has dropped to half of it, start reading input again. */
+#define OUTBUF_THROTTLE_SIZE 4096
+
+/* If we can't send anything for a minute, disconnect the client */
 #define CLIENT_OUTPUT_TIMEOUT (60*1000)
 
 /* Disconnect client when it sends too many bad commands in a row */
@@ -29,14 +34,7 @@
 static struct timeout *to_idle;
 
 static void client_input(void *context);
-
-static void client_output_timeout(void *context)
-{
-	struct client *client = context;
-
-	i_stream_close(client->input);
-	o_stream_close(client->output);
-}
+static void client_output(void *context);
 
 static int sync_mailbox(struct mailbox *box)
 {
@@ -131,14 +129,16 @@
 	struct client *client;
         enum mailbox_open_flags flags;
 
+	/* always use nonblocking I/O */
+	net_set_nonblock(hin, TRUE);
+	net_set_nonblock(hout, TRUE);
+
 	client = i_new(struct client, 1);
 	client->input = i_stream_create_file(hin, default_pool,
 					     MAX_INBUF_SIZE, FALSE);
-	client->output = o_stream_create_file(hout, default_pool, 4096, FALSE);
-
-	/* set timeout for sending data */
-	o_stream_set_blocking(client->output, CLIENT_OUTPUT_TIMEOUT,
-			      client_output_timeout, client);
+	client->output = o_stream_create_file(hout, default_pool,
+					      (size_t)-1, FALSE);
+	o_stream_set_flush_callback(client->output, client_output, client);
 
 	client->io = io_add(hin, IO_READ, client_input, client);
         client->last_input = ioloop_time;
@@ -171,8 +171,6 @@
 
 void client_destroy(struct client *client)
 {
-	o_stream_flush(client->output);
-
 	if (client->mailbox != NULL)
 		mailbox_close(client->mailbox);
 	mail_storage_destroy(client->storage);
@@ -180,7 +178,8 @@
 	i_free(client->message_sizes);
 	i_free(client->deleted_bitmask);
 
-	io_remove(client->io);
+	if (client->io != NULL)
+		io_remove(client->io);
 
 	i_stream_unref(client->input);
 	o_stream_unref(client->output);
@@ -194,25 +193,52 @@
 
 void client_disconnect(struct client *client)
 {
-	o_stream_flush(client->output);
+	(void)o_stream_flush(client->output);
 
 	i_stream_close(client->input);
 	o_stream_close(client->output);
 }
 
-void client_send_line(struct client *client, const char *fmt, ...)
+int client_send_line(struct client *client, const char *fmt, ...)
 {
 	va_list va;
+	string_t *str;
+	ssize_t ret;
 
 	if (client->output->closed)
-		return;
+		return -1;
 
 	t_push();
 	va_start(va, fmt);
-	(void)o_stream_send_str(client->output, t_strdup_vprintf(fmt, va));
-	(void)o_stream_send(client->output, "\r\n", 2);
+
+	str = t_str_new(256);
+	str_vprintfa(str, fmt, va);
+	str_append(str, "\r\n");
+
+	ret = o_stream_send(client->output, str_data(str), str_len(str));
+	if (ret < 0)
+		client_destroy(client);
+	else {
+		i_assert((size_t)ret == str_len(str));
+
+		if (o_stream_get_buffer_used_size(client->output) <
+		    OUTBUF_THROTTLE_SIZE) {
+			ret = 1;
+			client->last_output = ioloop_time;
+		} else {
+			ret = 0;
+			if (client->io != NULL) {
+				/* no more input until client has read
+				   our output */
+				io_remove(client->io);
+				client->io = NULL;
+			}
+		}
+	}
+
 	va_end(va);
 	t_pop();
+	return (int)ret;
 }
 
 void client_send_storage_error(struct client *client)
@@ -237,6 +263,16 @@
 	struct client *client = context;
 	char *line, *args;
 
+	if (client->cmd != NULL) {
+		/* we're still processing a command. wait until it's
+		   finished. */
+		io_remove(client->io);
+		client->io = NULL;
+		client->waiting_input = TRUE;
+		return;
+	}
+
+	client->waiting_input = FALSE;
 	client->last_input = ioloop_time;
 
 	switch (i_stream_read(client->input)) {
@@ -260,28 +296,63 @@
 		else
 			*args++ = '\0';
 
-		if (client_command_execute(client, line, args))
+		if (client_command_execute(client, line, args)) {
 			client->bad_counter = 0;
-		else if (++client->bad_counter > CLIENT_MAX_BAD_COMMANDS) {
+			if (client->cmd != NULL) {
+				client->waiting_input = TRUE;
+				break;
+			}
+		} else if (++client->bad_counter > CLIENT_MAX_BAD_COMMANDS) {
 			client_send_line(client, "-ERR Too many bad commands.");
 			client_disconnect(client);
 		}
 	}
-	o_stream_flush(client->output);
+	o_stream_uncork(client->output);
 
 	if (client->output->closed)
 		client_destroy(client);
 }
 
+static void client_output(void *context)
+{
+	struct client *client = context;
+	int ret;
+
+	if ((ret = o_stream_flush(client->output)) < 0) {
+		client_destroy(client);
+		return;
+	}
+
+	client->last_output = ioloop_time;
+
+	if (o_stream_get_buffer_used_size(client->output) <
+	    OUTBUF_THROTTLE_SIZE/2 && client->io == NULL &&
+	    client->cmd == NULL) {
+		/* enable input again */
+		client->io = io_add(i_stream_get_fd(client->input), IO_READ,
+				    client_input, client);
+		if (client->waiting_input)
+			client_input(client);
+	}
+}
+
 static void idle_timeout(void *context __attr_unused__)
 {
 	if (my_client == NULL)
 		return;
 
-	if (ioloop_time - my_client->last_input >= CLIENT_IDLE_TIMEOUT) {
-		client_send_line(my_client,
-				 "-ERR Disconnected for inactivity.");
-		client_destroy(my_client);
+	if (my_client->cmd != NULL) {
+		if (ioloop_time - my_client->last_output >=
+		    CLIENT_OUTPUT_TIMEOUT &&
+		    my_client->last_input < my_client->last_output)
+			client_destroy(my_client);
+	} else {
+		if (ioloop_time - my_client->last_input >=
+		    CLIENT_IDLE_TIMEOUT) {
+			client_send_line(my_client,
+					 "-ERR Disconnected for inactivity.");
+			client_destroy(my_client);
+		}
 	}
 }
 

Index: client.h
===================================================================
RCS file: /home/cvs/dovecot/src/pop3/client.h,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -d -r1.5 -r1.6
--- client.h	23 Jun 2004 18:33:22 -0000	1.5
+++ client.h	15 Aug 2004 03:40:32 -0000	1.6
@@ -1,18 +1,24 @@
 #ifndef __CLIENT_H
 #define __CLIENT_H
 
+struct client;
 struct mail_storage;
 
+typedef void command_func_t(struct client *client);
+
 struct client {
 	int socket;
 	struct io *io;
 	struct istream *input;
 	struct ostream *output;
 
+	command_func_t *cmd;
+	void *cmd_context;
+
 	struct mail_storage *storage;
 	struct mailbox *mailbox;
 
-	time_t last_input;
+	time_t last_input, last_output;
 	unsigned int bad_counter;
 
 	unsigned int messages_count;
@@ -24,6 +30,7 @@
 	unsigned char *deleted_bitmask;
 
 	unsigned int deleted:1;
+	unsigned int waiting_input:1;
 };
 
 /* Create new client with specified input/output handles. socket specifies
@@ -35,7 +42,7 @@
 void client_disconnect(struct client *client);
 
 /* Send a line of data to client */
-void client_send_line(struct client *client, const char *fmt, ...)
+int client_send_line(struct client *client, const char *fmt, ...)
 	__attr_format__(2, 3);
 void client_send_storage_error(struct client *client);
 

Index: commands.c
===================================================================
RCS file: /home/cvs/dovecot/src/pop3/commands.c,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -d -r1.19 -r1.20
--- commands.c	12 Jul 2004 18:14:45 -0000	1.19
+++ commands.c	15 Aug 2004 03:40:32 -0000	1.20
@@ -111,23 +111,48 @@
 	return TRUE;
 }
 
+struct cmd_list_context {
+	unsigned int msgnum;
+};
+
+static void cmd_list_callback(struct client *client)
+{
+	struct cmd_list_context *ctx = client->cmd_context;
+	int ret;
+
+	for (; ctx->msgnum != client->messages_count; ctx->msgnum++) {
+		if (client->deleted) {
+			if (client->deleted_bitmask[ctx->msgnum / CHAR_BIT] &
+			    (1 << (ctx->msgnum % CHAR_BIT)))
+				continue;
+		}
+		ret = client_send_line(client, "%u %"PRIuUOFF_T,
+				       ctx->msgnum+1,
+				       client->message_sizes[ctx->msgnum]);
+		if (ret < 0)
+			break;
+		if (ret == 0)
+			return;
+	}
+
+	client_send_line(client, ".");
+
+	i_free(ctx);
+	client->cmd = NULL;
+}
+
 static int cmd_list(struct client *client, const char *args)
 {
-	unsigned int i;
+        struct cmd_list_context *ctx;
 
 	if (*args == '\0') {
+		ctx = i_new(struct cmd_list_context, 1);
 		client_send_line(client, "+OK %u messages:",
 				 client->messages_count - client->deleted_count);
-		for (i = 0; i < client->messages_count; i++) {
-			if (client->deleted) {
-				if (client->deleted_bitmask[i / CHAR_BIT] &
-				    (1 << (i % CHAR_BIT)))
-					continue;
-			}
-			client_send_line(client, "%u %"PRIuUOFF_T,
-					 i+1, client->message_sizes[i]);
-		}
-		client_send_line(client, ".");
+
+		client->cmd = cmd_list_callback;
+		client->cmd_context = ctx;
+		cmd_list_callback(client);
 	} else {
 		unsigned int msgnum;
 
@@ -198,127 +223,149 @@
 	return TRUE;
 }
 
-static void stream_send_escaped(struct ostream *output, struct istream *input,
-				uoff_t body_lines)
+struct fetch_context {
+        struct mailbox_transaction_context *t;
+	struct mail_search_context *search_ctx;
+	struct istream *stream;
+	uoff_t body_lines;
+
+	struct mail_search_arg search_arg;
+        struct mail_search_seqset seqset;
+
+	unsigned char last;
+	int cr_skipped, in_body;
+};
+
+static void fetch_deinit(struct fetch_context *ctx)
 {
+	(void)mailbox_search_deinit(ctx->search_ctx);
+	(void)mailbox_transaction_commit(ctx->t);
+	i_free(ctx);
+}
+
+static void fetch_callback(struct client *client)
+{
+	struct fetch_context *ctx = client->cmd_context;
 	const unsigned char *data;
-	unsigned char last, add;
+	unsigned char add;
 	size_t i, size;
-	int cr_skipped, in_header;
+	ssize_t ret;
 
-	if (body_lines != (uoff_t)-1)
-		body_lines++; /* internally we count the empty line too */
+	o_stream_set_max_buffer_size(client->output, 0);
+
+	while ((ctx->body_lines > 0 || !ctx->in_body) &&
+	       i_stream_read_data(ctx->stream, &data, &size, 0) > 0) {
+		if (size > 4096)
+			size = 4096;
 
-	cr_skipped = FALSE; in_header = TRUE; last = '\0';
-	while ((body_lines > 0 || in_header) &&
-	       i_stream_read_data(input, &data, &size, 0) > 0) {
 		add = '\0';
 		for (i = 0; i < size; i++) {
-			if (in_header && (data[i] == '\r' || data[i] == '\n')) {
-				if (i == 0 && (last == '\0' || last == '\n'))
-					in_header = FALSE;
+			if ((data[i] == '\r' || data[i] == '\n') &&
+			    !ctx->in_body) {
+				if (i == 0 && (ctx->last == '\0' ||
+					       ctx->last == '\n'))
+					ctx->in_body = TRUE;
 				else if (i > 0 && data[i-1] == '\n')
-					in_header = FALSE;
+					ctx->in_body = TRUE;
 			}
 
 			if (data[i] == '\n') {
-				if ((i == 0 && last != '\r') ||
+				if ((i == 0 && ctx->last != '\r') ||
 				    (i > 0 && data[i-1] != '\r')) {
 					/* missing CR */
 					add = '\r';
 					break;
 				}
 
-				if (!in_header) {
-					if (--body_lines == 0) {
+				if (ctx->in_body) {
+					if (--ctx->body_lines == 0) {
 						i++;
 						break;
 					}
 				}
 			} else if (data[i] == '.' &&
-				   ((i == 0 && last == '\n') ||
+				   ((i == 0 && ctx->last == '\n') ||
 				    (i > 0 && data[i-1] == '\n'))) {
 				/* escape the dot */
 				add = '.';
-				i++;
 				break;
 			} else if (data[i] == '\0' &&
 				   (client_workarounds &
 				    WORKAROUND_OUTLOOK_NO_NULS) != 0) {
-				add = '\x80';
+				add = 0x80;
 				break;
 			}
 		}
 
-		if (o_stream_send(output, data, i) < 0)
+		if ((ret = o_stream_send(client->output, data, i)) < 0)
+			break;
+		if (ret > 0)
+			ctx->last = data[ret-1];
+		i_stream_skip(ctx->stream, ret);
+
+		if ((size_t)ret != i) {
+			/* continue later */
 			return;
+		}
 
 		if (add != '\0') {
-			if (o_stream_send(output, &add, 1) < 0)
+			if ((ret = o_stream_send(client->output, &add, 1)) < 0)
+				break;
+			if (ret == 0)
 				return;
-			last = add;
-			if (client_workarounds & WORKAROUND_OUTLOOK_NO_NULS) {
-				if (i < size && data[i] == '\0')
-					i++;
-			}
-		} else {
-			last = data[i-1];
-		}
 
-		i_stream_skip(input, i);
+			ctx->last = add;
+			if (add == 0x80)
+				i_stream_skip(ctx->stream, 1);
+		}
 	}
+	o_stream_set_max_buffer_size(client->output, (size_t)-1);
 
-	if (last != '\n') {
+	if (ctx->last != '\n') {
 		/* didn't end with CRLF */
-		(void)o_stream_send(output, "\r\n", 2);
+		(void)o_stream_send(client->output, "\r\n", 2);
 	}
+
+	client_send_line(client, ".");
+	fetch_deinit(ctx);
+	client->cmd = NULL;
 }
 
-static void fetch(struct client *client, unsigned int msgnum,
-		  uoff_t body_lines)
+static void fetch(struct client *client, unsigned int msgnum, uoff_t body_lines)
 {
-	struct mail_search_arg search_arg;
-        struct mail_search_seqset seqset;
-        struct mailbox_transaction_context *t;
-	struct mail_search_context *ctx;
+        struct fetch_context *ctx;
 	struct mail *mail;
-	struct istream *stream;
 
-	memset(&seqset, 0, sizeof(seqset));
-	seqset.seq1 = seqset.seq2 = msgnum+1;
+	ctx = i_new(struct fetch_context, 1);
 
-	memset(&search_arg, 0, sizeof(search_arg));
-	search_arg.type = SEARCH_SEQSET;
-	search_arg.value.seqset = &seqset;
+	ctx->seqset.seq1 = ctx->seqset.seq2 = msgnum+1;
+	ctx->search_arg.type = SEARCH_SEQSET;
+	ctx->search_arg.value.seqset = &ctx->seqset;
 
-	t = mailbox_transaction_begin(client->mailbox, FALSE);
-	ctx = mailbox_search_init(t, NULL, &search_arg, NULL,
-				  MAIL_FETCH_STREAM_HEADER |
-				  MAIL_FETCH_STREAM_BODY, NULL);
-	if (ctx == NULL) {
-		mailbox_transaction_rollback(t);
-		client_send_storage_error(client);
+	ctx->t = mailbox_transaction_begin(client->mailbox, FALSE);
+	ctx->search_ctx = mailbox_search_init(ctx->t, NULL, &ctx->search_arg,
+					      NULL, MAIL_FETCH_STREAM_HEADER |
+					      MAIL_FETCH_STREAM_BODY, NULL);
+	mail = mailbox_search_next(ctx->search_ctx);
+	ctx->stream = mail == NULL ? NULL : mail->get_stream(mail, NULL, NULL);
+	if (ctx->stream == NULL) {
+		client_send_line(client, "-ERR Message not found.");
+		fetch_deinit(ctx);
 		return;
 	}
 
-	mail = mailbox_search_next(ctx);
-	stream = mail == NULL ? NULL : mail->get_stream(mail, NULL, NULL);
-	if (stream == NULL)
-		client_send_line(client, "-ERR Message not found.");
-	else {
-		if (body_lines == (uoff_t)-1) {
-			client_send_line(client, "+OK %"PRIuUOFF_T" octets",
-					 client->message_sizes[msgnum]);
-		} else {
-			client_send_line(client, "+OK");
-		}
-
-		stream_send_escaped(client->output, stream, body_lines);
-		client_send_line(client, ".");
+	ctx->body_lines = body_lines;
+	if (body_lines == (uoff_t)-1) {
+		client_send_line(client, "+OK %"PRIuUOFF_T" octets",
+				 client->message_sizes[msgnum]);
+	} else {
+		client_send_line(client, "+OK");
+		ctx->body_lines++; /* internally we count the empty line too */
 	}
 
-	(void)mailbox_search_deinit(ctx);
-	(void)mailbox_transaction_commit(t);
+	client->cmd = fetch_callback;
+	client->cmd_context = ctx;
+	fetch_callback(client);
 }
 
 static int cmd_retr(struct client *client, const char *args)
@@ -368,37 +415,22 @@
 	return TRUE;
 }
 
-static void list_uids(struct client *client, unsigned int message)
-{
+struct cmd_uidl_context {
+        struct mailbox_transaction_context *t;
+	struct mail_search_context *search_ctx;
+	unsigned int message;
+
 	struct mail_search_arg search_arg;
 	struct mail_search_seqset seqset;
-        struct mailbox_transaction_context *t;
-	struct mail_search_context *ctx;
+};
+
+static int list_uids_iter(struct client *client, struct cmd_uidl_context *ctx)
+{
 	struct mail *mail;
 	const char *uid_str;
-	int found = FALSE;
-
-	if (client->messages_count == 0 && message == 0)
-		return;
-
-	memset(&search_arg, 0, sizeof(search_arg));
-	if (message == 0)
-		search_arg.type = SEARCH_ALL;
-	else {
-		seqset.seq1 = seqset.seq2 = message;
-		search_arg.type = SEARCH_SEQSET;
-		search_arg.value.seqset = &seqset;
-	}
-
-	t = mailbox_transaction_begin(client->mailbox, FALSE);
-	ctx = mailbox_search_init(t, NULL, &search_arg, NULL, 0, NULL);
-	if (ctx == NULL) {
-		mailbox_transaction_rollback(t);
-		client_send_storage_error(client);
-		return;
-	}
+	int ret, found = FALSE;
 
-	while ((mail = mailbox_search_next(ctx)) != NULL) {
+	while ((mail = mailbox_search_next(ctx->search_ctx)) != NULL) {
 		if (client->deleted) {
 			uint32_t idx = mail->seq - 1;
 			if (client->deleted_bitmask[idx / CHAR_BIT] &
@@ -407,31 +439,81 @@
 		}
 
 		uid_str = mail->get_special(mail, MAIL_FETCH_UID_STRING);
-		client_send_line(client, message == 0 ? "%u %s" : "+OK %u %s",
-				 mail->seq, uid_str);
 		found = TRUE;
+
+		ret = client_send_line(client, ctx->message == 0 ?
+				       "%u %s" : "+OK %u %s",
+				       mail->seq, uid_str);
+		if (ret < 0)
+			break;
+		if (ret == 0 && ctx->message == 0) {
+			/* output is being buffered, continue when there's
+			   more space */
+			return 0;
+		}
 	}
 
-	(void)mailbox_search_deinit(ctx);
-	(void)mailbox_transaction_commit(t);
+	/* finished */
+	(void)mailbox_search_deinit(ctx->search_ctx);
+	(void)mailbox_transaction_commit(ctx->t);
 
-	if (!found && message != 0)
-		client_send_line(client, "-ERR Message not found.");
+	client->cmd = NULL;
+
+	if (ctx->message == 0)
+		client_send_line(client, ".");
+	i_free(ctx);
+	return found;
+}
+
+static void cmd_uidl_callback(struct client *client)
+{
+	struct cmd_uidl_context *ctx = client->cmd_context;
+
+        (void)list_uids_iter(client, ctx);
+}
+
+static struct cmd_uidl_context *
+cmd_uidl_init(struct client *client, unsigned int message)
+{
+        struct cmd_uidl_context *ctx;
+
+	ctx = i_new(struct cmd_uidl_context, 1);
+
+	if (message == 0)
+		ctx->search_arg.type = SEARCH_ALL;
+	else {
+		ctx->seqset.seq1 = ctx->seqset.seq2 = message;
+		ctx->search_arg.type = SEARCH_SEQSET;
+		ctx->search_arg.value.seqset = &ctx->seqset;
+	}
+
+	ctx->t = mailbox_transaction_begin(client->mailbox, FALSE);
+	ctx->search_ctx = mailbox_search_init(ctx->t, NULL, &ctx->search_arg,
+					      NULL, 0, NULL);
+	if (message == 0) {
+		client->cmd = cmd_uidl_callback;
+		client->cmd_context = ctx;
+	}
+	return ctx;
 }
 
 static int cmd_uidl(struct client *client, const char *args)
 {
+        struct cmd_uidl_context *ctx;
+
 	if (*args == '\0') {
 		client_send_line(client, "+OK");
-		list_uids(client, 0);
-		client_send_line(client, ".");
+		ctx = cmd_uidl_init(client, 0);
+		list_uids_iter(client, ctx);
 	} else {
 		unsigned int msgnum;
 
 		if (get_msgnum(client, args, &msgnum) == NULL)
 			return FALSE;
 
-		list_uids(client, msgnum+1);
+		ctx = cmd_uidl_init(client, msgnum+1);
+		if (list_uids_iter(client, ctx))
+			client_send_line(client, "-ERR Message not found.");
 	}
 
 	return TRUE;



More information about the dovecot-cvs mailing list