From a77e7147c1814b4ed19d6abce417c8d8c627cc32 Mon Sep 17 00:00:00 2001 From: "Suren A. Chilingaryan" Date: Tue, 13 Dec 2011 14:57:51 +0100 Subject: Initial release --- fastwriter.c | 317 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 317 insertions(+) create mode 100644 fastwriter.c (limited to 'fastwriter.c') diff --git a/fastwriter.c b/fastwriter.c new file mode 100644 index 0000000..70f1043 --- /dev/null +++ b/fastwriter.c @@ -0,0 +1,317 @@ +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include + + +#include "private.h" +#include "default.h" +#include "sysinfo.h" + +fastwriter_t *fastwriter_init(const char *fs, fastwriter_flags_t flags) { + fastwriter_t *ctx; + + ctx = (fastwriter_t*)malloc(sizeof(fastwriter_t)); + if (!ctx) return ctx; + + memset(ctx, 0, sizeof(fastwriter_t)); + ctx->params.flags = flags; + ctx->api = &fastwriter_default_api; + + return ctx; +} + +void fastwriter_destroy(fastwriter_t *ctx) { + free(ctx); +} + +int fastwriter_set_buffer_size(fastwriter_t *ctx, size_t buffer_size) { + ctx->params.buffer_size = buffer_size; + + return 0; +} + +static void *fastwriter_writer_thread(void *user); + +int fastwriter_open(fastwriter_t *ctx, const char *name, fastwriter_flags_t flags) { + int i; + int err; + int e[4]; + + ctx->flags = flags | ctx->params.flags; + + switch (ctx->params.buffer_size) { + case FASTWRITER_BUFFER_DEFAULT: + ctx->size = FASTWRITER_DEFAULT_BUFFER_SIZE; + break; + case FASTWRITER_BUFFER_MAX: + ctx->size = get_free_memory(); + + if ((ctx->size - FASTWRITER_RESERVE_MEMORY) < FASTWRITER_DEFAULT_BUFFER_SIZE) + ctx->size = FASTWRITER_DEFAULT_BUFFER_SIZE; + else + ctx->size -= FASTWRITER_RESERVE_MEMORY; + + break; + default: + ctx->size = ctx->params.buffer_size; + } + + ctx->buffer = malloc(ctx->size); + if (!ctx->buffer) { + fastwriter_close(ctx); + return ENOMEM; + } + ctx->err = 0; + ctx->written = 0; + ctx->commited = 0; + ctx->chunked = 0; + + ctx->tail = 0; + ctx->head = 0; + ctx->pos = 0; + + err = ctx->api->open(ctx, name, ctx->flags); + if (err) { + fastwriter_close(ctx); + return err; + } + + e[0] = pthread_mutex_init(&ctx->data_cond_mutex, NULL); + e[1] = pthread_mutex_init(&ctx->space_cond_mutex, NULL); + e[2] = pthread_cond_init(&ctx->data_cond, NULL); + e[3] = pthread_cond_init(&ctx->space_cond, NULL); + + if (e[0]|e[1]|e[2]|e[3]) { + if (!e[3]) pthread_cond_destroy(&ctx->space_cond); + if (!e[2]) pthread_cond_destroy(&ctx->data_cond); + if (!e[1]) pthread_mutex_destroy(&ctx->space_cond_mutex); + if (!e[0]) pthread_mutex_destroy(&ctx->data_cond_mutex); + + fastwriter_close(ctx); + + for (i = 0; i < 4; i++) + if (e[i]) return e[i]; + } + + ctx->clean_locks = 1; + ctx->run_flag = 1; + + err = pthread_create(&ctx->wthread, NULL, &fastwriter_writer_thread, ctx); + if (err) { + ctx->run_flag = 0; + fastwriter_close(ctx); + return err; + } + + return 0; +} + +int fastwriter_close(fastwriter_t *ctx) { + if ((!ctx->err)&&(ctx->pos != ctx->head)) + return EBADFD; + + if (ctx->run_flag) { + ctx->run_flag = 0; + + pthread_mutex_lock(&ctx->data_cond_mutex); + pthread_cond_broadcast(&ctx->data_cond); + pthread_mutex_unlock(&ctx->data_cond_mutex); + + pthread_join(ctx->wthread, NULL); + } + + if (ctx->clean_locks) { + pthread_cond_destroy(&ctx->space_cond); + pthread_cond_destroy(&ctx->data_cond); + pthread_mutex_destroy(&ctx->space_cond_mutex); + pthread_mutex_destroy(&ctx->data_cond_mutex); + + ctx->clean_locks = 0; + } + + ctx->api->close(ctx); + + if (ctx->buffer) { + free(ctx->buffer); + ctx->buffer = NULL; + } + + return 0; + +} + + +static inline size_t fastwriter_compute_free_space(fastwriter_t *ctx) { + if (ctx->pos < ctx->tail) return ctx->tail - ctx->pos; + return ctx->tail + ctx->size - ctx->pos - 1; +} + +int fastwriter_get_stats(fastwriter_t *ctx, fastwriter_stats_t *stats) { + stats->buffer_size = ctx->size; + stats->buffer_used = ctx->size - fastwriter_compute_free_space(ctx); + stats->buffer_max = ctx->max_usage; + stats->commited = ctx->commited; + stats->written = ctx->written; + return 0; +} + + +static void *fastwriter_writer_thread(void *user) { + int err = 0; + fastwriter_write_flags_t flags; + size_t size; + size_t head; + + fastwriter_t *ctx = (fastwriter_t*)user; + + while ((ctx->run_flag)||(ctx->head != ctx->tail)) { + if (ctx->head != ctx->tail) { + head = ctx->head; + + if (head > ctx->tail) { + size = head - ctx->tail; + flags = FASTWRITER_WRITE_FLAGS_DEFAULT; + } else { + size = ctx->size - ctx->tail; + flags = FASTWRITER_WRITE_FLAG_FORCE; + } + + if (!ctx->run_flag) + flags |= FASTWRITER_WRITE_FLAG_FORCE; + + err = ctx->api->write(ctx, flags, size, ctx->buffer + ctx->tail, &size); + if (err) { + ctx->err = err; + ctx->run_flag = 0; + + pthread_mutex_lock(&ctx->space_cond_mutex); + pthread_cond_broadcast(&ctx->space_cond); + pthread_mutex_unlock(&ctx->space_cond_mutex); + + return NULL; + } + + if (size > 0) { + ctx->written += size; + + size += ctx->tail; + if (size == ctx->size) ctx->tail = 0; + else ctx->tail = size; + + pthread_mutex_lock(&ctx->space_cond_mutex); + pthread_cond_broadcast(&ctx->space_cond); + pthread_mutex_unlock(&ctx->space_cond_mutex); + } else { + pthread_mutex_lock(&ctx->data_cond_mutex); + while ((ctx->run_flag)&&(ctx->head == head)) { + pthread_cond_wait(&ctx->data_cond, &ctx->data_cond_mutex); + } + } + } else { + pthread_mutex_lock(&ctx->data_cond_mutex); + while ((ctx->run_flag)&&(ctx->head == ctx->tail)) { + pthread_cond_wait(&ctx->data_cond, &ctx->data_cond_mutex); + } + } + } + + return NULL; +} + + +int fastwriter_push_chunk(fastwriter_t *ctx, size_t size, const void *data) { + size_t part1, end; + size_t free = fastwriter_compute_free_space(ctx); + + if (free < size) { + ctx->max_usage = ctx->size; + + if ((ctx->flags&FASTWRITER_FLAGS_BLOCK)==0) + return EWOULDBLOCK; + + pthread_mutex_lock(&ctx->space_cond_mutex); + while ((ctx->run_flag)&&(fastwriter_compute_free_space(ctx) < size)) { + pthread_cond_wait(&ctx->space_cond, &ctx->space_cond_mutex); + } + pthread_mutex_unlock(&ctx->space_cond_mutex); + } else { + end = ctx->size - (free - size); + if (end > ctx->max_usage) ctx->max_usage = end; + } + + if (!ctx->run_flag) { + if (ctx->err) return ctx->err; + return EBADFD; + } + + if (ctx->pos < ctx->tail) end = ctx->tail; + else end = ctx->size; + + + part1 = end - ctx->pos; + + if (part1 > size) { + // tail < pos (we have checked for free space) + end = size - part1; + memcpy(ctx->buffer + ctx->pos, data, part1); + memcpy(ctx->buffer, data + part1, end); + ctx->pos = end; + } else { + memcpy(ctx->buffer + ctx->pos, data, size); + ctx->pos += size; + + if (ctx->pos == ctx->size) ctx->pos = 0; + } + + ctx->chunked += size; + + return 0; +} + + +int fastwriter_commit(fastwriter_t *ctx) { + ctx->head = ctx->pos; + + pthread_mutex_lock(&ctx->data_cond_mutex); + pthread_cond_broadcast(&ctx->data_cond); + pthread_mutex_unlock(&ctx->data_cond_mutex); + + ctx->commited += ctx->chunked; + ctx->chunked = 0; + + return 0; +} + + +int fastwriter_cancel(fastwriter_t *ctx) { + ctx->pos = ctx->head; + + ctx->chunked = 0; + + return 0; +} + + +int fastwriter_push_data(fastwriter_t *ctx, size_t size, const void *buf) { + int err; + err = fastwriter_push_chunk(ctx, size, buf); + if (err) return err; + + err = fastwriter_commit(ctx); + if (err) fastwriter_cancel(ctx); + + return err; +} -- cgit v1.2.3