summaryrefslogtreecommitdiffstats
path: root/fastwriter.c
diff options
context:
space:
mode:
Diffstat (limited to 'fastwriter.c')
-rw-r--r--fastwriter.c27
1 files changed, 17 insertions, 10 deletions
diff --git a/fastwriter.c b/fastwriter.c
index 3e63468..529acd6 100644
--- a/fastwriter.c
+++ b/fastwriter.c
@@ -68,9 +68,12 @@ int fastwriter_open(fastwriter_t *ctx, const char *name, fastwriter_flags_t flag
default:
ctx->size = ctx->params.buffer_size;
}
-
- ctx->buffer = malloc(ctx->size);
- if (!ctx->buffer) {
+
+ if (ctx->size%FASTWRITER_SYNCIO_ALIGN)
+ ctx->size += FASTWRITER_SYNCIO_ALIGN - (ctx->size%FASTWRITER_SYNCIO_ALIGN);
+
+ err = posix_memalign(&ctx->buffer, FASTWRITER_SYNCIO_ALIGN, ctx->size);
+ if ((err)||(!ctx->buffer)) {
fastwriter_close(ctx);
return ENOMEM;
}
@@ -149,7 +152,7 @@ int fastwriter_close(fastwriter_t *ctx) {
ctx->buffer = NULL;
}
- return 0;
+ return ctx->err;
}
@@ -177,6 +180,7 @@ static void *fastwriter_writer_thread(void *user) {
fastwriter_t *ctx = (fastwriter_t*)user;
+
while ((ctx->run_flag)||(ctx->head != ctx->tail)) {
if (ctx->head != ctx->tail) {
head = ctx->head;
@@ -219,12 +223,14 @@ static void *fastwriter_writer_thread(void *user) {
while ((ctx->run_flag)&&(ctx->head == head)) {
pthread_cond_wait(&ctx->data_cond, &ctx->data_cond_mutex);
}
+ pthread_mutex_unlock(&ctx->data_cond_mutex);
}
} else {
pthread_mutex_lock(&ctx->data_cond_mutex);
while ((ctx->run_flag)&&(ctx->head == ctx->tail)) {
pthread_cond_wait(&ctx->data_cond, &ctx->data_cond_mutex);
}
+ pthread_mutex_unlock(&ctx->data_cond_mutex);
}
}
@@ -251,19 +257,19 @@ int fastwriter_push(fastwriter_t *ctx, size_t size, const void *data) {
end = ctx->size - (free - size);
if (end > ctx->max_usage) ctx->max_usage = end;
}
-
+
if (!ctx->run_flag) {
if (ctx->err) return ctx->err;
return EBADFD;
}
-
+
if (ctx->pos < ctx->tail) end = ctx->tail;
else end = ctx->size;
-
+
part1 = end - ctx->pos;
-
- if (part1 > size) {
+
+ if (part1 < size) {
// tail < pos (we have checked for free space)
end = size - part1;
memcpy(ctx->buffer + ctx->pos, data, part1);
@@ -307,9 +313,10 @@ int fastwriter_cancel(fastwriter_t *ctx) {
int fastwriter_push_data(fastwriter_t *ctx, size_t size, const void *buf) {
int err;
+
err = fastwriter_push(ctx, size, buf);
if (err) return err;
-
+
err = fastwriter_commit(ctx);
if (err) fastwriter_cancel(ctx);