[dovecot-cvs] dovecot/src/lib Makefile.am, 1.49, 1.50 istream-seekable.c, NONE, 1.1 istream-seekable.h, NONE, 1.1

cras at dovecot.org cras at dovecot.org
Tue Mar 29 16:30:58 EEST 2005


Update of /var/lib/cvs/dovecot/src/lib
In directory talvi:/tmp/cvs-serv3483/lib

Modified Files:
	Makefile.am 
Added Files:
	istream-seekable.c istream-seekable.h 
Log Message:
Added istream-seekable, which allows combining multiple input streams and
creating a single seekable stream out of them.



Index: Makefile.am
===================================================================
RCS file: /var/lib/cvs/dovecot/src/lib/Makefile.am,v
retrieving revision 1.49
retrieving revision 1.50
diff -u -d -r1.49 -r1.50
--- Makefile.am	27 Mar 2005 13:51:54 -0000	1.49
+++ Makefile.am	29 Mar 2005 13:30:55 -0000	1.50
@@ -25,6 +25,7 @@
 	istream-file.c \
 	istream-limit.c \
 	istream-mmap.c \
+	istream-seekable.c \
 	ioloop.c \
 	ioloop-notify-none.c \
 	ioloop-notify-dn.c \
@@ -94,6 +95,7 @@
 	iostream-internal.h \
 	istream.h \
 	istream-internal.h \
+	istream-seekable.h \
 	ioloop.h \
 	ioloop-internal.h \
 	lib.h \

--- NEW FILE: istream-seekable.c ---
/* Copyright (C) 2005 Timo Sirainen */

#include "lib.h"
#include "buffer.h"
#include "hex-binary.h"
#include "randgen.h"
#include "write-full.h"
#include "istream-internal.h"
#include "istream-seekable.h"

#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>

#define BUF_INITIAL_SIZE (1024*32)

struct seekable_istream {
	struct _istream istream;
	pool_t pool;

	size_t max_buffer_size;
	char *temp_prefix;
	uoff_t write_peak;

	buffer_t *buffer;
	struct istream **input, *cur_input;
	struct istream *fd_input;
	unsigned int cur_idx;
	int fd;
};

static void _close(struct _iostream *stream __attr_unused__)
{
	struct seekable_istream *sstream = (struct seekable_istream *)stream;
	unsigned int i;

	sstream->fd = -1;
	if (sstream->fd_input != NULL)
		i_stream_close(sstream->fd_input);
	for (i = 0; sstream->input[i] != NULL; i++)
		i_stream_close(sstream->input[i]);
}

static void _destroy(struct _iostream *stream)
{
	struct seekable_istream *sstream = (struct seekable_istream *)stream;
	unsigned int i;

	if (sstream->buffer != NULL)
		buffer_free(sstream->buffer);
	if (sstream->fd_input != NULL)
		i_stream_unref(sstream->fd_input);
	for (i = 0; sstream->input[i] != NULL; i++)
		i_stream_unref(sstream->input[i]);

	p_free(sstream->pool, sstream->temp_prefix);
	pool_unref(sstream->pool);
}

static void _set_max_buffer_size(struct _iostream *stream, size_t max_size)
{
	struct seekable_istream *sstream = (struct seekable_istream *)stream;
	unsigned int i;

	sstream->max_buffer_size = max_size;
	if (sstream->fd_input != NULL)
		i_stream_set_max_buffer_size(sstream->fd_input, max_size);
	for (i = 0; sstream->input[i] != NULL; i++)
		i_stream_set_max_buffer_size(sstream->input[i], max_size);
}

static int copy_to_temp_file(struct seekable_istream *sstream)
{
	unsigned char randbuf[8];
	const char *path;
	struct stat st;
	int fd;

	/* create a temporary file */
	for (;;) {
		random_fill_weak(randbuf, sizeof(randbuf));
		path = t_strconcat(sstream->temp_prefix, ".",
				   dec2str(time(NULL)), ".",
				   dec2str(getpid()), ".",
				   binary_to_hex(randbuf, sizeof(randbuf)),
				   NULL);
		if (stat(path, &st) == 0)
			continue;

		if (errno != ENOENT) {
			i_error("stat(%s) failed: %m", path);
			return -1;
		}

		fd = open(path, O_RDWR | O_EXCL | O_CREAT, 0600);
		if (fd != -1)
			break;

		if (errno != EEXIST) {
			i_error("open(%s) failed: %m", path);
			return -1;
		}
	}

	/* we just want the fd, unlink it */
	if (unlink(path) < 0) {
		/* shouldn't happen.. */
		i_error("unlink(%s) failed: %m", path);
		(void)close(fd);
		return -1;
	}

	/* copy our currently read buffer to it */
	if (write_full(fd, sstream->buffer->data, sstream->buffer->used) < 0) {
		i_error("write_full(%s) failed: %m", path);
		(void)close(fd);
		return -1;
	}
	sstream->write_peak = sstream->buffer->used;

	buffer_free(sstream->buffer);
	sstream->buffer = NULL;

	sstream->fd = fd;
	sstream->fd_input =
		i_stream_create_file(fd, sstream->pool,
				     sstream->max_buffer_size, TRUE);
	return 0;
}

static ssize_t read_more(struct seekable_istream *sstream)
{
	ssize_t ret;

	if (sstream->cur_input == NULL) {
		sstream->istream.istream.eof = TRUE;
		return -1;
	}

	while ((ret = i_stream_read(sstream->cur_input)) < 0) {
		if (!sstream->cur_input->eof) {
			/* error */
			sstream->istream.istream.stream_errno =
				sstream->cur_input->stream_errno;
			return -1;
		}

		/* go to next stream */
		sstream->cur_input = sstream->input[sstream->cur_idx++];
		if (sstream->cur_input == NULL) {
			/* last one, EOF */
			sstream->istream.istream.eof = TRUE;
			return -1;
		}
	}
	return ret;
}

static int read_from_buffer(struct seekable_istream *sstream, ssize_t *ret)
{
	struct _istream *stream = &sstream->istream;
	const unsigned char *data;
	size_t size, pos, offset;

	if (stream->istream.v_offset +
	    (stream->pos - stream->skip) >= sstream->buffer->used) {
		/* need to read more */
		if (sstream->buffer->used >= sstream->max_buffer_size)
			return FALSE;

		/* read more to buffer */
		*ret = read_more(sstream);
		if (*ret <= 0)
			return TRUE;

		/* we should have more now. */
		data = i_stream_get_data(sstream->cur_input, &size);
		buffer_append(sstream->buffer, data, size);
		i_stream_skip(sstream->cur_input, size);
	}

	offset = stream->istream.v_offset;
	stream->buffer = CONST_PTR_OFFSET(sstream->buffer->data, offset);
	pos = sstream->buffer->used - offset;

	*ret = pos - stream->pos;
	stream->pos = pos;
	return TRUE;
}

static ssize_t _read(struct _istream *stream)
{
	struct seekable_istream *sstream = (struct seekable_istream *)stream;
	const unsigned char *data;
	size_t size;
	ssize_t ret;

	stream->pos -= stream->skip;
	stream->skip = 0;

	if (sstream->buffer != NULL) {
		if (read_from_buffer(sstream, &ret))
			return ret;

		/* copy everything to temp file and use it as the stream */
		if (copy_to_temp_file(sstream) < 0) {
			i_stream_close(&stream->istream);
			return -1;
		}
	}

	while (stream->istream.v_offset + stream->pos >= sstream->write_peak) {
		/* need to read more */
		ret = read_more(sstream);
		if (ret <= 0)
			return ret;

		/* save to our file */
		data = i_stream_get_data(sstream->cur_input, &size);
		if (write_full(sstream->fd, data, size) < 0) {
			i_error("write_full(%s...) failed: %m",
				sstream->temp_prefix);
			i_stream_close(&stream->istream);
			return -1;
		}
		i_stream_skip(sstream->cur_input, size);
		sstream->write_peak += size;
	}

	i_stream_seek(sstream->fd_input, stream->istream.v_offset);
	ret = i_stream_read(sstream->fd_input);
	if (ret <= 0) {
		stream->istream.eof = sstream->fd_input->eof;
		stream->istream.stream_errno =
			sstream->fd_input->stream_errno;
	}
	return ret;
}

static void _seek(struct _istream *stream, uoff_t v_offset)
{
	stream->istream.v_offset = v_offset;
}

static const struct stat *_stat(struct _istream *stream)
{
	struct seekable_istream *sstream = (struct seekable_istream *)stream;
	uoff_t old_offset;
	ssize_t ret;

	if (sstream->buffer != NULL) {
		old_offset = stream->istream.v_offset;
		do {
			i_stream_skip(&stream->istream, stream->skip);
		} while ((ret = _read(stream)) > 0);

		if (ret == 0)
			i_panic("get_size() used for non-blocking stream");
		i_stream_seek(&stream->istream, old_offset);
	}

	if (sstream->fd_input != NULL)
		return i_stream_stat(sstream->fd_input);

	stream->statbuf.st_size = sstream->buffer->used;
	return &stream->statbuf;
}

struct istream *
i_stream_create_seekable(struct istream *input[], pool_t pool,
			 size_t max_buffer_size, const char *temp_prefix)
{
	struct seekable_istream *sstream;
	const unsigned char *data;
	unsigned int count;
	size_t size;

	for (count = 0; input[count] != NULL; count++)
		i_stream_ref(input[count]);
	i_assert(count != 0);

	pool_ref(pool);
	sstream = p_new(pool, struct seekable_istream, 1);
	sstream->pool = pool;
	sstream->temp_prefix = p_strdup(pool, temp_prefix);
	sstream->buffer = buffer_create_dynamic(pool, BUF_INITIAL_SIZE);
        sstream->max_buffer_size = max_buffer_size;

	sstream->input = p_new(pool, struct istream *, count + 1);
	memcpy(sstream->input, input, sizeof(*input) * count);
	sstream->cur_input = sstream->input[0];

	/* initialize our buffer from first stream's pending data */
	data = i_stream_get_data(sstream->cur_input, &size);
	buffer_append(sstream->buffer, data, size);
	i_stream_skip(sstream->cur_input, size);

	sstream->istream.iostream.close = _close;
	sstream->istream.iostream.destroy = _destroy;
	sstream->istream.iostream.set_max_buffer_size = _set_max_buffer_size;

	sstream->istream.read = _read;
	sstream->istream.seek = _seek;
	sstream->istream.stat = _stat;

	return _i_stream_create(&sstream->istream, pool, -1, 0);
}

--- NEW FILE: istream-seekable.h ---
#ifndef __ISTREAM_SEEKABLE_H
#define __ISTREAM_SEEKABLE_H

/* Create a seekable stream from given NULL-terminated list of input streams.
   Try to keep it in memory, but use a temporary file if it's too large.

   temp_prefix is used as path and filename prefix for creating the file.
   It will be appended by PID, timestamp and 128 bits of weak randomness. */
struct istream *
i_stream_create_seekable(struct istream *input[], pool_t pool,
			 size_t max_buffer_size, const char *temp_prefix);

#endif



More information about the dovecot-cvs mailing list