diff options
author | Suren A. Chilingaryan <csa@dside.dyndns.org> | 2011-12-13 17:54:10 +0100 |
---|---|---|
committer | Suren A. Chilingaryan <csa@dside.dyndns.org> | 2011-12-13 17:54:10 +0100 |
commit | 9c14774f2b6b22628a8b57b7a1e5edec1e236f9c (patch) | |
tree | 115dcad2c883ed6ba19f3d3c2bec4622ad07a279 | |
parent | 77c5312bac656dc916db91ab97054f80b15eb2f4 (diff) | |
download | fastwriter-9c14774f2b6b22628a8b57b7a1e5edec1e236f9c.tar.gz fastwriter-9c14774f2b6b22628a8b57b7a1e5edec1e236f9c.tar.bz2 fastwriter-9c14774f2b6b22628a8b57b7a1e5edec1e236f9c.tar.xz fastwriter-9c14774f2b6b22628a8b57b7a1e5edec1e236f9c.zip |
Few synchronization and alignment related fixes
-rw-r--r-- | default.c | 51 | ||||
-rw-r--r-- | fastwriter.c | 27 | ||||
-rw-r--r-- | private.h | 1 |
3 files changed, 56 insertions, 23 deletions
@@ -40,6 +40,8 @@ typedef struct { int fd; + + int sync_mode; size_t prior_size; /**< original size of file */ size_t preallocated; /**< preallocated bytes */ @@ -56,9 +58,6 @@ int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags int open_flags = (O_CREAT|O_WRONLY|O_NOATIME|O_LARGEFILE); int open_mode = (S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); -#ifdef SYNC_MODE - open_flags |= O_DIRECT;//|O_SYNC; -#endif /* SYNC_MODE */ fastwriter_default_t *ctx; @@ -72,9 +71,17 @@ int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags fw->ctx = ctx; +#ifdef SYNC_MODE + open_flags |= O_DIRECT; + ctx->sync_mode = 1; +#endif /* SYNC_MODE */ + + ctx->prior_size = 0; + if (!strcmp(fs, "raw")) { ctx->wr_block = EXT4_WRITEBLOCK; ctx->pa_block = 0; + ctx->prior_size = (size_t)-1; } else if (!strcmp(fs, "ext4")) { ctx->wr_block = EXT4_WRITEBLOCK; ctx->pa_block = EXT4_PREALLOCATE; @@ -95,13 +102,21 @@ int fastwriter_default_open(fastwriter_t *fw, const char *name, fastwriter_flags ctx->fd = open(name, open_flags, open_mode); if (ctx->fd < 0) return errno; - ctx->prior_size = 0; - -#ifndef HAVE_LINUX_FALLOC_H if (((open_flags&FASTWRITER_FLAGS_OVERWRITE)==0)&&(strcmp(fs, "raw"))) { ctx->prior_size = lseek(ctx->fd, 0, SEEK_END); +# ifdef SYNC_MODE + if (ctx->prior_size%FASTWRITER_SYNCIO_ALIGN) { + close(ctx->fd); + + ctx->fd = open(name, open_flags&~O_DIRECT, open_mode); + if (ctx->fd < 0) return errno; + + ctx->prior_size = lseek(ctx->fd, 0, SEEK_END); + + ctx->sync_mode = 0; + } +# endif /* SYNC_MODE */ } -#endif /* HAVE_LINUX_FALLOC_H */ ctx->preallocated = 0; @@ -114,11 +129,11 @@ void fastwriter_default_close(fastwriter_t *fw) { fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx; if (ctx->fd >= 0) { -#ifndef HAVE_LINUX_FALLOC_H - if (ctx->prior_size) { - ftrucate(ctx->fd, ctx->prior_size + fw->written); +#if defined(SYNC_MODE)||!defined(HAVE_LINUX_FALLOC_H) + if (ctx->prior_size != (size_t)-1) { + ftruncate(ctx->fd, ctx->prior_size + fw->written); } -#endif /* HAVE_LINUX_FALLOC_H */ +#endif close(ctx->fd); } @@ -130,9 +145,10 @@ void fastwriter_default_close(fastwriter_t *fw) { int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, size_t size, void *data, size_t *written) { size_t sum = 0; + size_t delta = 0; ssize_t res; fastwriter_default_t *ctx = (fastwriter_default_t*)fw->ctx; - + if ((flags&FASTWRITER_WRITE_FLAG_FORCE)==0) { if (size < ctx->wr_block) { *written = 0; @@ -141,6 +157,7 @@ int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, s size -= size % ctx->wr_block; } + if ((ctx->pa_block)&&((fw->written + size) > ctx->preallocated)) { #ifdef HAVE_LINUX_FALLOC_H @@ -153,9 +170,17 @@ int fastwriter_default_write(fastwriter_t *fw, fastwriter_write_flags_t flags, s ctx->preallocated += ctx->pa_block; } } + +#ifdef SYNC_MODE + // we expect this to happen only at last iteration (buffer is multiply of the required align) + if ((ctx->sync_mode)&&(size%FASTWRITER_SYNCIO_ALIGN)) { + delta = FASTWRITER_SYNCIO_ALIGN - size%FASTWRITER_SYNCIO_ALIGN; + } +#endif /* SYNC_MODE */ do { - res = write(ctx->fd, data, size); + res = write(ctx->fd, data + sum, size + delta - sum); +// printf("%i %i %p %zu %i\n", res, ctx->fd, data, size, delta); if (res < 0) { *written = sum; return errno; diff --git a/fastwriter.c b/fastwriter.c index 3e63468..529acd6 100644 --- a/fastwriter.c +++ b/fastwriter.c @@ -68,9 +68,12 @@ int fastwriter_open(fastwriter_t *ctx, const char *name, fastwriter_flags_t flag default: ctx->size = ctx->params.buffer_size; } - - ctx->buffer = malloc(ctx->size); - if (!ctx->buffer) { + + if (ctx->size%FASTWRITER_SYNCIO_ALIGN) + ctx->size += FASTWRITER_SYNCIO_ALIGN - (ctx->size%FASTWRITER_SYNCIO_ALIGN); + + err = posix_memalign(&ctx->buffer, FASTWRITER_SYNCIO_ALIGN, ctx->size); + if ((err)||(!ctx->buffer)) { fastwriter_close(ctx); return ENOMEM; } @@ -149,7 +152,7 @@ int fastwriter_close(fastwriter_t *ctx) { ctx->buffer = NULL; } - return 0; + return ctx->err; } @@ -177,6 +180,7 @@ static void *fastwriter_writer_thread(void *user) { fastwriter_t *ctx = (fastwriter_t*)user; + while ((ctx->run_flag)||(ctx->head != ctx->tail)) { if (ctx->head != ctx->tail) { head = ctx->head; @@ -219,12 +223,14 @@ static void *fastwriter_writer_thread(void *user) { while ((ctx->run_flag)&&(ctx->head == head)) { pthread_cond_wait(&ctx->data_cond, &ctx->data_cond_mutex); } + pthread_mutex_unlock(&ctx->data_cond_mutex); } } else { pthread_mutex_lock(&ctx->data_cond_mutex); while ((ctx->run_flag)&&(ctx->head == ctx->tail)) { pthread_cond_wait(&ctx->data_cond, &ctx->data_cond_mutex); } + pthread_mutex_unlock(&ctx->data_cond_mutex); } } @@ -251,19 +257,19 @@ int fastwriter_push(fastwriter_t *ctx, size_t size, const void *data) { end = ctx->size - (free - size); if (end > ctx->max_usage) ctx->max_usage = end; } - + if (!ctx->run_flag) { if (ctx->err) return ctx->err; return EBADFD; } - + if (ctx->pos < ctx->tail) end = ctx->tail; else end = ctx->size; - + part1 = end - ctx->pos; - - if (part1 > size) { + + if (part1 < size) { // tail < pos (we have checked for free space) end = size - part1; memcpy(ctx->buffer + ctx->pos, data, part1); @@ -307,9 +313,10 @@ int fastwriter_cancel(fastwriter_t *ctx) { int fastwriter_push_data(fastwriter_t *ctx, size_t size, const void *buf) { int err; + err = fastwriter_push(ctx, size, buf); if (err) return err; - + err = fastwriter_commit(ctx); if (err) fastwriter_cancel(ctx); @@ -1,6 +1,7 @@ #ifndef _FASTWRITER_PRIVATE_H #define _FASTWRITER_PRIVATE_H +#define FASTWRITER_SYNCIO_ALIGN 512 #define FASTWRITER_DEFAULT_BUFFER_SIZE 134217728 /* 128 MB */ #define FASTWRITER_RESERVE_MEMORY 536870912 /* 512 MB */ |