dovecot: Added concatenation input stream.

dovecot at dovecot.org dovecot at dovecot.org
Sat Oct 20 19:17:05 EEST 2007


details:   http://hg.dovecot.org/dovecot/rev/026f67ecd858
changeset: 6561:026f67ecd858
user:      Timo Sirainen <tss at iki.fi>
date:      Sat Oct 20 19:16:25 2007 +0300
description:
Added concatenation input stream.

diffstat:

3 files changed, 258 insertions(+)
src/lib/Makefile.am      |    2 
src/lib/istream-concat.c |  249 ++++++++++++++++++++++++++++++++++++++++++++++
src/lib/istream-concat.h |    7 +

diffs (283 lines):

diff -r 9036aa956d14 -r 026f67ecd858 src/lib/Makefile.am
--- a/src/lib/Makefile.am	Sat Oct 20 19:15:59 2007 +0300
+++ b/src/lib/Makefile.am	Sat Oct 20 19:16:25 2007 +0300
@@ -36,6 +36,7 @@ liblib_a_SOURCES = \
 	imem.c \
 	iostream.c \
 	istream.c \
+	istream-concat.c \
 	istream-crlf.c \
 	istream-data.c \
 	istream-file.c \
@@ -131,6 +132,7 @@ headers = \
 	imem.h \
 	iostream-internal.h \
 	istream.h \
+	istream-concat.h \
 	istream-crlf.h \
 	istream-internal.h \
 	istream-seekable.h \
diff -r 9036aa956d14 -r 026f67ecd858 src/lib/istream-concat.c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/istream-concat.c	Sat Oct 20 19:16:25 2007 +0300
@@ -0,0 +1,249 @@
+/* Copyright (c) 2007 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "buffer.h"
+#include "istream-internal.h"
+#include "istream-concat.h"
+
+struct concat_istream {
+	struct istream_private istream;
+
+	struct istream **input, *cur_input;
+	uoff_t *input_size;
+
+	unsigned int cur_idx, unknown_size_idx;
+	size_t prev_size;
+};
+
+static void i_stream_concat_close(struct iostream_private *stream)
+{
+	struct concat_istream *cstream = (struct concat_istream *)stream;
+	unsigned int i;
+
+	for (i = 0; cstream->input[i] != NULL; i++)
+		i_stream_close(cstream->input[i]);
+}
+
+static void i_stream_concat_destroy(struct iostream_private *stream)
+{
+	struct concat_istream *cstream = (struct concat_istream *)stream;
+	unsigned int i;
+
+	for (i = 0; cstream->input[i] != NULL; i++)
+		i_stream_unref(&cstream->input[i]);
+}
+
+static void
+i_stream_concat_set_max_buffer_size(struct iostream_private *stream,
+				    size_t max_size)
+{
+	struct concat_istream *cstream = (struct concat_istream *)stream;
+	unsigned int i;
+
+	cstream->istream.max_buffer_size = max_size;
+	for (i = 0; cstream->input[i] != NULL; i++)
+		i_stream_set_max_buffer_size(cstream->input[i], max_size);
+}
+
+static void i_stream_concat_read_next(struct concat_istream *cstream)
+{
+	const unsigned char *data;
+	size_t data_size, size;
+
+	i_assert(cstream->cur_input->eof);
+
+	cstream->cur_idx++;
+	cstream->cur_input = cstream->input[cstream->cur_idx];
+
+	if (cstream->istream.pos == cstream->istream.skip)
+		return;
+
+	/* we need to keep the current data */
+	data = cstream->istream.buffer + cstream->istream.skip;
+	data_size = cstream->istream.pos - cstream->istream.skip;
+
+	/* we already verified that the data size is less than the
+	   maximum buffer size */
+	if (!i_stream_get_buffer_space(&cstream->istream, data_size, &size))
+		i_unreached();
+	i_assert(size >= data_size);
+
+	cstream->prev_size = data_size;
+	memcpy(cstream->istream.w_buffer, data, data_size);
+}
+
+static ssize_t i_stream_concat_read(struct istream_private *stream)
+{
+	struct concat_istream *cstream = (struct concat_istream *)stream;
+	const unsigned char *data;
+	size_t size, pos, skip;
+	ssize_t ret;
+	bool last_stream;
+
+	if (cstream->cur_input == NULL)
+		return -1;
+
+	skip = stream->skip;
+	if (cstream->prev_size > 0) {
+		if (stream->skip < cstream->prev_size) {
+			cstream->prev_size -= stream->skip;
+			skip = 0;
+		} else {
+			/* we don't need the buffer anymore */
+			skip -= cstream->prev_size;
+			stream->skip -= cstream->prev_size;
+			cstream->prev_size = 0;
+
+			i_free_and_null(stream->w_buffer);
+			stream->buffer = NULL;
+			stream->buffer_size = 0;
+		}
+	} 
+	i_stream_skip(cstream->cur_input, skip);
+
+	data = i_stream_get_data(cstream->cur_input, &pos);
+	if (pos > stream->pos)
+		ret = 0;
+	else {
+		/* need to read more */
+		ret = i_stream_read(cstream->cur_input);
+		if (ret == -2 || ret == 0)
+			return ret;
+
+		if (ret == -1 && stream->istream.stream_errno != 0) {
+			stream->istream.stream_errno =
+				cstream->cur_input->stream_errno;
+			return -1;
+		}
+
+		/* we either read something or we're at EOF */
+		last_stream = cstream->input[cstream->cur_idx+1] == NULL;
+		if (ret == -1 && !last_stream) {
+			if (stream->pos - stream->skip >=
+			    stream->max_buffer_size)
+				return -2;
+
+			i_stream_concat_read_next(cstream);
+			last_stream =
+				cstream->input[cstream->cur_idx+1] == NULL;
+		}
+
+		stream->istream.eof = cstream->cur_input->eof && last_stream;
+		data = i_stream_get_data(cstream->cur_input, &pos);
+	}
+
+	if (stream->w_buffer == NULL) {
+		stream->buffer = data;
+		stream->pos -= stream->skip;
+		stream->skip = 0;
+	} else {
+		if (!i_stream_get_buffer_space(stream, pos, &size))
+			return -2;
+		memcpy(stream->w_buffer + stream->pos, data, I_MIN(size, pos));
+	}
+
+	ret = pos > stream->pos ? (ssize_t)(pos - stream->pos) :
+		(ret == 0 ? 0 : -1);
+	stream->pos = pos;
+	return ret;
+}
+
+static unsigned int
+find_v_offset(struct concat_istream *cstream, uoff_t *v_offset)
+{
+	const struct stat *st;
+	unsigned int i;
+
+	for (i = 0; cstream->input[i] != NULL; i++) {
+		if (*v_offset == 0) {
+			/* seek to beginning of this stream */
+			break;
+		}
+		if (i == cstream->unknown_size_idx) {
+			/* we'll need to figure out this stream's size */
+			st = i_stream_stat(cstream->input[i], TRUE);
+			if (st == NULL) {
+				cstream->istream.istream.stream_errno =
+					cstream->input[i]->stream_errno;
+				return (unsigned int)-1;
+			}
+
+			/* @UNSAFE */
+			cstream->input_size[i] = st->st_size;
+			cstream->unknown_size_idx = i + 1;
+		}
+		if (*v_offset < cstream->input_size[i])
+			break;
+		*v_offset -= cstream->input_size[i];
+	}
+
+	return i;
+}
+
+static void i_stream_concat_seek(struct istream_private *stream,
+				 uoff_t v_offset, bool mark ATTR_UNUSED)
+{
+	struct concat_istream *cstream = (struct concat_istream *)stream;
+
+	stream->istream.stream_errno = 0;
+	stream->istream.v_offset = v_offset;
+	stream->skip = stream->pos = 0;
+
+	cstream->cur_idx = find_v_offset(cstream, &v_offset);
+	if (cstream->cur_idx == (unsigned int)-1) {
+		cstream->cur_input = NULL;
+		return;
+	}
+	cstream->cur_input = cstream->input[cstream->cur_idx];
+	i_stream_seek(cstream->cur_input, v_offset);
+}
+
+static const struct stat *
+i_stream_concat_stat(struct istream_private *stream, bool exact ATTR_UNUSED)
+{
+	struct concat_istream *cstream = (struct concat_istream *)stream;
+	uoff_t v_offset = (uoff_t)-1;
+	unsigned int i;
+
+	/* make sure we have all sizes */
+	(void)find_v_offset(cstream, &v_offset);
+
+	stream->statbuf.st_size = 0;
+	for (i = 0; i < cstream->unknown_size_idx; i++)
+		stream->statbuf.st_size += cstream->input_size[i];
+	return &stream->statbuf;
+}
+
+struct istream *i_stream_create_concat(struct istream *input[])
+{
+	struct concat_istream *cstream;
+	unsigned int count;
+	size_t max_buffer_size = I_STREAM_MIN_SIZE;
+
+	for (count = 0; input[count] != NULL; count++) {
+		size_t cur_max = input[count]->real_stream->max_buffer_size;
+		if (cur_max > max_buffer_size)
+			max_buffer_size = cur_max;
+		i_stream_ref(input[count]);
+	}
+	i_assert(count != 0);
+
+	cstream = i_new(struct concat_istream, 1);
+	cstream->input = i_new(struct istream *, count + 1);
+	cstream->input_size = i_new(uoff_t, count + 1);
+
+	memcpy(cstream->input, input, sizeof(*input) * count);
+	cstream->cur_input = cstream->input[0];
+
+	cstream->istream.iostream.close = i_stream_concat_close;
+	cstream->istream.iostream.destroy = i_stream_concat_destroy;
+	cstream->istream.iostream.set_max_buffer_size =
+		i_stream_concat_set_max_buffer_size;
+
+	cstream->istream.max_buffer_size = max_buffer_size;
+	cstream->istream.read = i_stream_concat_read;
+	cstream->istream.seek = i_stream_concat_seek;
+	cstream->istream.stat = i_stream_concat_stat;
+
+	return i_stream_create(&cstream->istream, -1, 0);
+}
diff -r 9036aa956d14 -r 026f67ecd858 src/lib/istream-concat.h
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib/istream-concat.h	Sat Oct 20 19:16:25 2007 +0300
@@ -0,0 +1,7 @@
+#ifndef ISTREAM_CONCAT_H
+#define ISTREAM_CONCAT_H
+
+/* Concatenate input streams into a single stream. */
+struct istream *i_stream_create_concat(struct istream *input[]);
+
+#endif


More information about the dovecot-cvs mailing list