From 5ec3695c551f446cb1620ce138d0e3eaefbf1451 Mon Sep 17 00:00:00 2001 From: SagiROosto Date: Wed, 10 Dec 2025 12:34:33 +0200 Subject: [PATCH 1/6] out_logrotate: add logrotate out plugin Signed-off-by: SagiROosto --- CMakeLists.txt | 1 + cmake/plugins_options.cmake | 1 + cmake/windows-setup.cmake | 1 + conf/fluent-bit-logrotate.conf | 21 + plugins/CMakeLists.txt | 1 + plugins/out_logrotate/CMakeLists.txt | 8 + plugins/out_logrotate/logrotate.c | 1643 ++++++++++++++++++++++++++ plugins/out_logrotate/logrotate.h | 32 + 8 files changed, 1708 insertions(+) create mode 100644 conf/fluent-bit-logrotate.conf create mode 100644 plugins/out_logrotate/CMakeLists.txt create mode 100644 plugins/out_logrotate/logrotate.c create mode 100644 plugins/out_logrotate/logrotate.h diff --git a/CMakeLists.txt b/CMakeLists.txt index e586dec2891..7c193ac6806 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -307,6 +307,7 @@ if(FLB_ALL) set(FLB_OUT_NULL 1) set(FLB_OUT_PLOT 1) set(FLB_OUT_FILE 1) + set(FLB_OUT_LOGROTATE 1) set(FLB_OUT_RETRY 1) set(FLB_OUT_TD 1) set(FLB_OUT_STDOUT 1) diff --git a/cmake/plugins_options.cmake b/cmake/plugins_options.cmake index 703073ff3f8..8f5ec837012 100644 --- a/cmake/plugins_options.cmake +++ b/cmake/plugins_options.cmake @@ -117,6 +117,7 @@ DEFINE_OPTION(FLB_OUT_DATADOG "Enable DataDog output plugin" DEFINE_OPTION(FLB_OUT_ES "Enable Elasticsearch output plugin" ON) DEFINE_OPTION(FLB_OUT_EXIT "Enable Exit output plugin" ON) DEFINE_OPTION(FLB_OUT_FILE "Enable file output plugin" ON) +DEFINE_OPTION(FLB_OUT_LOGROTATE "Enable logrotate output plugin" ON) DEFINE_OPTION(FLB_OUT_FLOWCOUNTER "Enable flowcount output plugin" ON) DEFINE_OPTION(FLB_OUT_FORWARD "Enable Forward output plugin" ON) DEFINE_OPTION(FLB_OUT_GELF "Enable GELF output plugin" ON) diff --git a/cmake/windows-setup.cmake b/cmake/windows-setup.cmake index 2791fffb4d9..48a13aa5c71 100644 --- a/cmake/windows-setup.cmake +++ b/cmake/windows-setup.cmake @@ -100,6 +100,7 @@ if(FLB_WINDOWS_DEFAULTS) set(FLB_OUT_NATS No) set(FLB_OUT_PLOT No) set(FLB_OUT_FILE Yes) + set(FLB_OUT_LOGROTATE Yes) set(FLB_OUT_TD No) set(FLB_OUT_RETRY No) set(FLB_OUT_SPLUNK Yes) diff --git a/conf/fluent-bit-logrotate.conf b/conf/fluent-bit-logrotate.conf new file mode 100644 index 00000000000..e1b18482e63 --- /dev/null +++ b/conf/fluent-bit-logrotate.conf @@ -0,0 +1,21 @@ +[SERVICE] + Flush 1 + Log_Level info + Parsers_File parsers.conf + +[INPUT] + Name dummy + Tag test.logrotate + Dummy {"message": "test log message", "level": "info"} + Rate 10 + +[OUTPUT] + Name logrotate + Match test.logrotate + Path /tmp/logs + File test.log + Format json + Max_Size 10M + Max_Files 5 + Gzip On + Mkdir On diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 65d417d7cbb..fd7d12af5a0 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -363,6 +363,7 @@ REGISTER_OUT_PLUGIN("out_datadog") REGISTER_OUT_PLUGIN("out_es") REGISTER_OUT_PLUGIN("out_exit") REGISTER_OUT_PLUGIN("out_file") +REGISTER_OUT_PLUGIN("out_logrotate") REGISTER_OUT_PLUGIN("out_forward") REGISTER_OUT_PLUGIN("out_http") REGISTER_OUT_PLUGIN("out_influxdb") diff --git a/plugins/out_logrotate/CMakeLists.txt b/plugins/out_logrotate/CMakeLists.txt new file mode 100644 index 00000000000..870d922320b --- /dev/null +++ b/plugins/out_logrotate/CMakeLists.txt @@ -0,0 +1,8 @@ +set(src + logrotate.c) + +if(MSVC) + FLB_PLUGIN(out_logrotate "${src}" "Shlwapi") +else() + FLB_PLUGIN(out_logrotate "${src}" "") +endif() diff --git a/plugins/out_logrotate/logrotate.c b/plugins/out_logrotate/logrotate.c new file mode 100644 index 00000000000..0342c29d94f --- /dev/null +++ b/plugins/out_logrotate/logrotate.c @@ -0,0 +1,1643 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#ifndef FLB_SYSTEM_WINDOWS +#include +#include /* dirname */ +#include +#endif +#include /* PRIu64 */ +#include /* PATH_MAX */ +#include +#include +#include + +#ifdef FLB_SYSTEM_WINDOWS +#include +#include +#include +#endif + +#include "logrotate.h" + +#ifdef FLB_SYSTEM_WINDOWS +#define NEWLINE "\r\n" +#define S_ISDIR(m) (((m) & S_IFMT) == S_IFDIR) +#else +#define NEWLINE "\n" +#endif + +#ifdef FLB_SYSTEM_WINDOWS +#define FLB_PATH_SEPARATOR "\\" +#else +#define FLB_PATH_SEPARATOR "/" +#endif + +/* Constants for streaming gzip compression */ +#define GZIP_CHUNK_SIZE \ + (16 * 1024) /* 16KB chunks to prevent stack overflow in coroutines */ +#define GZIP_HEADER_SIZE 10 +#define GZIP_FOOTER_SIZE 8 + +struct logrotate_file_size { + flb_sds_t filename; + size_t size; + flb_lock_t lock; /* Mutex to protect file operations */ + struct mk_list _head; +}; + +struct flb_logrotate_conf { + const char *out_path; + const char *out_file; + const char *delimiter; + const char *label_delimiter; + const char *template; + int format; + int csv_column_names; + int mkdir; + size_t max_size; /* Max file size */ + int max_files; /* Maximum number of rotated files to keep */ + int gzip; /* Whether to gzip rotated files */ + struct mk_list file_sizes; /* Linked list to store file size per filename */ + flb_lock_t list_lock; /* Lock to protect the file_sizes list */ + struct flb_output_instance *ins; +}; + +static char *check_delimiter(const char *str) { + if (str == NULL) { + return NULL; + } + + if (!strcasecmp(str, "\\t") || !strcasecmp(str, "tab")) { + return "\t"; + } else if (!strcasecmp(str, "space")) { + return " "; + } else if (!strcasecmp(str, "comma")) { + return ","; + } + + return NULL; +} + +static int cb_logrotate_init(struct flb_output_instance *ins, + struct flb_config *config, void *data) { + int ret; + const char *tmp; + char *ret_str; + (void)config; + (void)data; + struct flb_logrotate_conf *ctx; + + ctx = flb_calloc(1, sizeof(struct flb_logrotate_conf)); + if (!ctx) { + flb_errno(); + return -1; + } + ctx->ins = ins; + ctx->format = FLB_OUT_LOGROTATE_FMT_JSON; /* default */ + ctx->delimiter = NULL; + ctx->label_delimiter = NULL; + ctx->template = NULL; + + /* Initialize linked list to store file sizes per filename */ + mk_list_init(&ctx->file_sizes); + flb_lock_init(&ctx->list_lock); + + ret = flb_output_config_map_set(ins, (void *)ctx); + if (ret == -1) { + flb_free(ctx); + return -1; + } + + if (ctx->max_files <= 0) { + flb_plg_error(ctx->ins, "invalid max_files=%d; must be >= 1", + ctx->max_files); + flb_free(ctx); + return -1; + } + + /* Optional, file format */ + tmp = flb_output_get_property("Format", ins); + if (tmp) { + if (!strcasecmp(tmp, "csv")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_CSV; + ctx->delimiter = ","; + } else if (!strcasecmp(tmp, "ltsv")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_LTSV; + ctx->delimiter = "\t"; + ctx->label_delimiter = ":"; + } else if (!strcasecmp(tmp, "plain")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_PLAIN; + ctx->delimiter = NULL; + ctx->label_delimiter = NULL; + } else if (!strcasecmp(tmp, "msgpack")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_MSGPACK; + ctx->delimiter = NULL; + ctx->label_delimiter = NULL; + } else if (!strcasecmp(tmp, "template")) { + ctx->format = FLB_OUT_LOGROTATE_FMT_TEMPLATE; + } else if (!strcasecmp(tmp, "out_logrotate")) { + /* for explicit setting */ + ctx->format = FLB_OUT_LOGROTATE_FMT_JSON; + } else { + flb_plg_error(ctx->ins, "unknown format %s. abort.", tmp); + flb_free(ctx); + return -1; + } + } + + tmp = flb_output_get_property("delimiter", ins); + ret_str = check_delimiter(tmp); + if (ret_str != NULL) { + ctx->delimiter = ret_str; + } + + tmp = flb_output_get_property("label_delimiter", ins); + ret_str = check_delimiter(tmp); + if (ret_str != NULL) { + ctx->label_delimiter = ret_str; + } + + /* Set the context */ + flb_output_set_context(ins, ctx); + + /* Log resolved configuration values */ + flb_plg_info(ctx->ins, + "logrotate plugin initialized with: max_size=%zu, max_files=%d, " + "gzip=%s, path=%s", + ctx->max_size, ctx->max_files, + ctx->gzip == FLB_TRUE ? "true" : "false", + ctx->out_path ? ctx->out_path : "not set"); + + return 0; +} + +static int csv_output(FILE *fp, int column_names, struct flb_time *tm, + msgpack_object *obj, struct flb_logrotate_conf *ctx) { + int i; + int map_size; + msgpack_object_kv *kv = NULL; + + if (obj->type == MSGPACK_OBJECT_MAP && obj->via.map.size > 0) { + kv = obj->via.map.ptr; + map_size = obj->via.map.size; + + if (column_names == FLB_TRUE) { + fprintf(fp, "timestamp%s", ctx->delimiter); + for (i = 0; i < map_size; i++) { + msgpack_object_print(fp, (kv + i)->key); + if (i + 1 < map_size) { + fprintf(fp, "%s", ctx->delimiter); + } + } + fprintf(fp, NEWLINE); + } + + fprintf(fp, "%lld.%.09ld%s", (long long)tm->tm.tv_sec, tm->tm.tv_nsec, + ctx->delimiter); + + for (i = 0; i < map_size - 1; i++) { + msgpack_object_print(fp, (kv + i)->val); + fprintf(fp, "%s", ctx->delimiter); + } + + msgpack_object_print(fp, (kv + (map_size - 1))->val); + fprintf(fp, NEWLINE); + } + return 0; +} + +static int ltsv_output(FILE *fp, struct flb_time *tm, msgpack_object *obj, + struct flb_logrotate_conf *ctx) { + msgpack_object_kv *kv = NULL; + int i; + int map_size; + + if (obj->type == MSGPACK_OBJECT_MAP && obj->via.map.size > 0) { + kv = obj->via.map.ptr; + map_size = obj->via.map.size; + fprintf(fp, "\"time\"%s%f%s", ctx->label_delimiter, flb_time_to_double(tm), + ctx->delimiter); + + for (i = 0; i < map_size - 1; i++) { + msgpack_object_print(fp, (kv + i)->key); + fprintf(fp, "%s", ctx->label_delimiter); + msgpack_object_print(fp, (kv + i)->val); + fprintf(fp, "%s", ctx->delimiter); + } + + msgpack_object_print(fp, (kv + (map_size - 1))->key); + fprintf(fp, "%s", ctx->label_delimiter); + msgpack_object_print(fp, (kv + (map_size - 1))->val); + fprintf(fp, NEWLINE); + } + return 0; +} + +static int template_output_write(struct flb_logrotate_conf *ctx, FILE *fp, + struct flb_time *tm, msgpack_object *obj, + const char *key, int size) { + int i; + msgpack_object_kv *kv; + + /* + * Right now we treat "{time}" specially and fill the placeholder + * with the metadata timestamp (formatted as float). + */ + if (!strncmp(key, "time", size)) { + fprintf(fp, "%f", flb_time_to_double(tm)); + return 0; + } + + if (obj->type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "invalid object type (type=%i)", obj->type); + return -1; + } + + for (i = 0; i < obj->via.map.size; i++) { + kv = obj->via.map.ptr + i; + + if (size != kv->key.via.str.size) { + continue; + } + + if (!memcmp(key, kv->key.via.str.ptr, size)) { + if (kv->val.type == MSGPACK_OBJECT_STR) { + fwrite(kv->val.via.str.ptr, 1, kv->val.via.str.size, fp); + } else { + msgpack_object_print(fp, kv->val); + } + return 0; + } + } + return -1; +} + +/* + * Python-like string templating for out_logrotate. + * + * This accepts a format string like "my name is {name}" and fills + * placeholders using corresponding values in a record. + * + * e.g. {"name":"Tom"} => "my name is Tom" + */ +static int template_output(FILE *fp, struct flb_time *tm, msgpack_object *obj, + struct flb_logrotate_conf *ctx) { + int i; + int len = strlen(ctx->template); + int keysize; + const char *key; + const char *pos; + const char *inbrace = NULL; /* points to the last open brace */ + + for (i = 0; i < len; i++) { + pos = ctx->template + i; + if (*pos == '{') { + if (inbrace) { + /* + * This means that we find another open brace inside + * braces (e.g. "{a{b}"). Ignore the previous one. + */ + fwrite(inbrace, 1, pos - inbrace, fp); + } + inbrace = pos; + } else if (*pos == '}' && inbrace) { + key = inbrace + 1; + keysize = pos - inbrace - 1; + + if (template_output_write(ctx, fp, tm, obj, key, keysize)) { + fwrite(inbrace, 1, pos - inbrace + 1, fp); + } + inbrace = NULL; + } else { + if (!inbrace) { + fputc(*pos, fp); + } + } + } + + /* Handle an unclosed brace like "{abc" */ + if (inbrace) { + fputs(inbrace, fp); + } + fputs(NEWLINE, fp); + return 0; +} + +static int plain_output(FILE *fp, msgpack_object *obj, size_t alloc_size, + int escape_unicode) { + char *buf; + + buf = flb_msgpack_to_json_str(alloc_size, obj, escape_unicode); + if (buf) { + fprintf(fp, "%s" NEWLINE, buf); + flb_free(buf); + } + return 0; +} + +static void print_metrics_text(struct flb_output_instance *ins, FILE *fp, + const void *data, size_t bytes) { + int ret; + size_t off = 0; + cfl_sds_t text; + struct cmt *cmt = NULL; + + /* get cmetrics context */ + ret = cmt_decode_msgpack_create(&cmt, (char *)data, bytes, &off); + if (ret != 0) { + flb_plg_error(ins, "could not process metrics payload"); + return; + } + + /* convert to text representation */ + text = cmt_encode_text_create(cmt); + + /* destroy cmt context */ + cmt_destroy(cmt); + + fprintf(fp, "%s", text); + cmt_encode_text_destroy(text); +} + +static int mkpath(struct flb_output_instance *ins, const char *dir) { + struct stat st; + char *dup_dir = NULL; +#ifdef FLB_SYSTEM_MACOS + char *parent_dir = NULL; +#endif +#ifdef FLB_SYSTEM_WINDOWS + char parent_path[MAX_PATH]; + DWORD err; + char *p; + char *sep; +#endif + int ret; + + if (!dir) { + errno = EINVAL; + return -1; + } + + if (strlen(dir) == 0) { + errno = EINVAL; + return -1; + } + + if (stat(dir, &st) == 0) { + if (S_ISDIR(st.st_mode)) { + return 0; + } + flb_plg_error(ins, "%s is not a directory", dir); + errno = ENOTDIR; + return -1; + } + +#ifdef FLB_SYSTEM_WINDOWS + if (strncpy_s(parent_path, MAX_PATH, dir, _TRUNCATE) != 0) { + flb_plg_error(ins, "path is too long: %s", dir); + errno = ENAMETOOLONG; + return -1; + } + + p = parent_path; + + /* Skip the drive letter if present (e.g., "C:") */ + if (p[1] == ':') { + p += 2; + } + + /* Normalize all forward slashes to backslashes */ + while (*p != '\0') { + if (*p == '/') { + *p = '\\'; + } + p++; + } + + flb_plg_debug(ins, "processing path '%s'", parent_path); + sep = strstr(parent_path, FLB_PATH_SEPARATOR); + if (sep != NULL && PathRemoveFileSpecA(parent_path)) { + flb_plg_debug(ins, "creating directory (recursive) %s", parent_path); + ret = mkpath(ins, parent_path); + if (ret != 0) { + /* If creating the parent failed, we cannot continue. */ + return -1; + } + } + + flb_plg_debug(ins, "attempting to create final directory '%s'", dir); + if (!CreateDirectoryA(dir, NULL)) { + err = GetLastError(); + + if (err != ERROR_ALREADY_EXISTS) { + flb_plg_error(ins, "could not create directory '%s' (error=%lu)", dir, + err); + return -1; + } + } + + return 0; +#elif FLB_SYSTEM_MACOS + dup_dir = flb_strdup(dir); + if (!dup_dir) { + return -1; + } + + /* macOS's dirname(3) should return current directory when slash + * charachter is not included in passed string. + * And note that macOS's dirname(3) does not modify passed string. + */ + parent_dir = dirname(dup_dir); + if (stat(parent_dir, &st) == 0 && strncmp(parent_dir, ".", 1)) { + if (S_ISDIR(st.st_mode)) { + flb_plg_debug(ins, "creating directory %s", dup_dir); + ret = mkdir(dup_dir, 0755); + flb_free(dup_dir); + return ret; + } + } + + ret = mkpath(ins, dirname(dup_dir)); + if (ret != 0) { + flb_free(dup_dir); + return ret; + } + flb_plg_debug(ins, "creating directory %s", dup_dir); + ret = mkdir(dup_dir, 0755); + if (ret == -1 && errno == EEXIST) { + if (stat(dup_dir, &st) == 0 && S_ISDIR(st.st_mode)) { + ret = 0; + } + } + flb_free(dup_dir); + return ret; +#else + dup_dir = flb_strdup(dir); + if (!dup_dir) { + return -1; + } + ret = mkpath(ins, dirname(dup_dir)); + flb_free(dup_dir); + if (ret != 0) { + return ret; + } + flb_plg_debug(ins, "creating directory %s", dir); + ret = mkdir(dir, 0755); + if (ret == -1 && errno == EEXIST) { + if (stat(dir, &st) == 0 && S_ISDIR(st.st_mode)) { + ret = 0; + } + } + return ret; +#endif +} + +/* Helper function to find a file size entry by filename */ +static struct logrotate_file_size * +find_file_size_entry(struct flb_logrotate_conf *ctx, const char *filename) { + struct mk_list *head; + struct logrotate_file_size *entry; + + /* Caller must hold ctx->list_lock */ + mk_list_foreach(head, &ctx->file_sizes) { + entry = mk_list_entry(head, struct logrotate_file_size, _head); + if (entry->filename && strcmp(entry->filename, filename) == 0) { + return entry; + } + } + return NULL; +} + +/* Helper function to create file size entry */ +static struct logrotate_file_size * +create_file_size_entry(struct flb_logrotate_conf *ctx, const char *filename, + size_t size) { + struct logrotate_file_size *entry; + flb_sds_t filename_copy; + + /* Caller must hold ctx->list_lock */ + + /* Create new entry */ + entry = flb_calloc(1, sizeof(struct logrotate_file_size)); + if (!entry) { + flb_errno(); + return NULL; + } + + filename_copy = flb_sds_create(filename); + if (!filename_copy) { + flb_free(entry); + flb_errno(); + return NULL; + } + + entry->filename = filename_copy; + entry->size = size; + + /* Initialize mutex for this file entry */ + if (flb_lock_init(&entry->lock) != 0) { + flb_plg_error(ctx->ins, "failed to initialize mutex for file %s", filename); + flb_sds_destroy(filename_copy); + flb_free(entry); + return NULL; + } + + mk_list_add(&entry->_head, &ctx->file_sizes); + + return entry; +} + +/* Function to update file size counter for a specific file */ +static void update_file_size_counter(struct flb_logrotate_conf *ctx, + const char *filename, FILE *fp) { + struct stat st; + size_t file_size; + struct logrotate_file_size *entry; + + if (fstat(fileno(fp), &st) != 0 || st.st_size < 0) { + return; + } + + file_size = (size_t)st.st_size; + + /* Acquire list lock to find or create entry */ + if (flb_lock_acquire(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) != 0) { + return; + } + + /* Find or create file size entry */ + entry = find_file_size_entry(ctx, filename); + if (entry == NULL) { + /* Create entry (list_lock is already held) */ + entry = create_file_size_entry(ctx, filename, file_size); + if (entry == NULL) { + flb_plg_warn(ctx->ins, "failed to create file size entry for %s", + filename); + flb_lock_release(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + return; + } + } + + /* + * Hand-Over-Hand Locking: + * 1. We hold list_lock. + * 2. Acquire entry->lock. + * 3. Release list_lock. + * 4. Perform entry operations. + * 5. Release entry->lock. + */ + + if (flb_lock_acquire(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) != 0) { + flb_plg_warn(ctx->ins, "failed to acquire lock for file %s", filename); + flb_lock_release(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + return; + } + + /* We can release the list lock now that we have the entry lock */ + flb_lock_release(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + + /* Update size atomically */ + entry->size = file_size; + + /* Release entry lock */ + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); +} + +/* Function to generate timestamp for rotated file */ +static void generate_timestamp(char *timestamp, size_t size) { + time_t now = time(NULL); + struct tm tm_info; + localtime_r(&now, &tm_info); + strftime(timestamp, size, "%Y%m%d_%H%M%S", &tm_info); +} + +/* Helper function to write gzip header (based on flb_gzip.c) */ +static void write_gzip_header(FILE *fp) { + uint8_t header[GZIP_HEADER_SIZE] = { + 0x1F, 0x8B, /* Magic bytes */ + 0x08, /* Compression method (deflate) */ + 0x00, /* Flags */ + 0x00, 0x00, 0x00, 0x00, /* Timestamp */ + 0x00, /* Compression flags */ + 0xFF /* OS (unknown) */ + }; + fwrite(header, 1, GZIP_HEADER_SIZE, fp); +} + +/* Helper function to write gzip footer */ +static void write_gzip_footer(FILE *fp, mz_ulong crc, size_t original_size) { + uint8_t footer[GZIP_FOOTER_SIZE]; + + /* Write CRC32 */ + footer[0] = crc & 0xFF; + footer[1] = (crc >> 8) & 0xFF; + footer[2] = (crc >> 16) & 0xFF; + footer[3] = (crc >> 24) & 0xFF; + + /* Write original size */ + footer[4] = original_size & 0xFF; + footer[5] = (original_size >> 8) & 0xFF; + footer[6] = (original_size >> 16) & 0xFF; + footer[7] = (original_size >> 24) & 0xFF; + + fwrite(footer, 1, GZIP_FOOTER_SIZE, fp); +} + +/* Function to compress a file using streaming gzip (memory-safe for large + * files) */ +static int gzip_compress_file(const char *input_filename, + const char *output_filename, + struct flb_output_instance *ins) { + FILE *src_fp = NULL, *dst_fp = NULL; + char *input_buffer = NULL, *output_buffer = NULL; + size_t bytes_read, output_buffer_size; + size_t total_input_size = 0; + mz_ulong crc = MZ_CRC32_INIT; + z_stream strm; + int ret = 0, flush, status; + int deflate_initialized = 0; + + /* Open source file */ + src_fp = fopen(input_filename, "rb"); + if (!src_fp) { + flb_plg_error(ins, "failed to open source file for gzip: %s", + input_filename); + return -1; + } + + /* Open destination file */ + dst_fp = fopen(output_filename, "wb"); + if (!dst_fp) { + flb_plg_error(ins, "failed to create gzip file: %s", output_filename); + fclose(src_fp); + return -1; + } + + /* Allocate input and output buffers */ + input_buffer = flb_malloc(GZIP_CHUNK_SIZE); + /* Use mz_compressBound to ensure sufficient buffer size for miniz */ + output_buffer_size = mz_compressBound(GZIP_CHUNK_SIZE); + output_buffer = flb_malloc(output_buffer_size); + + if (!input_buffer || !output_buffer) { + flb_plg_error(ins, "failed to allocate compression buffers"); + ret = -1; + goto cleanup; + } + + /* Write gzip header */ + write_gzip_header(dst_fp); + + /* Initialize deflate stream (raw deflate without gzip wrapper) */ + memset(&strm, 0, sizeof(strm)); + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + + status = deflateInit2(&strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, + -Z_DEFAULT_WINDOW_BITS, 9, Z_DEFAULT_STRATEGY); + if (status != Z_OK) { + flb_plg_error(ins, "failed to initialize deflate stream"); + ret = -1; + goto cleanup; + } + deflate_initialized = 1; + + /* Process file in chunks */ + do { + bytes_read = fread(input_buffer, 1, GZIP_CHUNK_SIZE, src_fp); + if (bytes_read > 0) { + /* Update CRC and total size */ + crc = mz_crc32(crc, (const unsigned char *)input_buffer, bytes_read); + total_input_size += bytes_read; + + /* Set up deflate input */ + strm.next_in = (Bytef *)input_buffer; + strm.avail_in = bytes_read; + + /* Determine flush mode based on EOF after this read */ + flush = feof(src_fp) ? Z_FINISH : Z_NO_FLUSH; + + /* Compress chunk */ + do { + strm.next_out = (Bytef *)output_buffer; + strm.avail_out = output_buffer_size; + + status = deflate(&strm, flush); + if (status == Z_STREAM_ERROR) { + flb_plg_error(ins, "deflate stream error during compression"); + ret = -1; + goto deflate_cleanup; + } + + /* Write compressed data */ + size_t compressed_bytes = output_buffer_size - strm.avail_out; + if (compressed_bytes > 0) { + if (fwrite(output_buffer, 1, compressed_bytes, dst_fp) != + compressed_bytes) { + flb_plg_error(ins, "failed to write compressed data"); + ret = -1; + goto deflate_cleanup; + } + } + } while (strm.avail_out == 0); + + /* Verify all input was consumed */ + if (strm.avail_in != 0) { + flb_plg_error(ins, "deflate did not consume all input data"); + ret = -1; + goto deflate_cleanup; + } + } + } while (bytes_read > 0 && status != Z_STREAM_END); + + /* + * If the file size is a multiple of GZIP_CHUNK_SIZE, the loop above finishes + * because bytes_read == 0, but Z_FINISH was never called (flush was + * Z_NO_FLUSH). We must ensure the stream is finished. + */ + if (status != Z_STREAM_END) { + strm.next_in = Z_NULL; + strm.avail_in = 0; + + do { + strm.next_out = (Bytef *)output_buffer; + strm.avail_out = output_buffer_size; + + status = deflate(&strm, Z_FINISH); + if (status == Z_STREAM_ERROR) { + flb_plg_error(ins, "deflate stream error during final flush"); + ret = -1; + goto deflate_cleanup; + } + + size_t compressed_bytes = output_buffer_size - strm.avail_out; + if (compressed_bytes > 0) { + if (fwrite(output_buffer, 1, compressed_bytes, dst_fp) != + compressed_bytes) { + flb_plg_error(ins, "failed to write compressed data (final flush)"); + ret = -1; + goto deflate_cleanup; + } + } + } while (status != Z_STREAM_END); + } + + /* Verify compression completed successfully */ + if (status != Z_STREAM_END) { + flb_plg_error(ins, "compression did not complete properly"); + ret = -1; + } else { + /* Write gzip footer (CRC32 + original size) */ + write_gzip_footer(dst_fp, crc, total_input_size); + } + +deflate_cleanup: + if (deflate_initialized) { + deflateEnd(&strm); + deflate_initialized = 0; + } + +cleanup: + if (input_buffer) { + flb_free(input_buffer); + input_buffer = NULL; + } + if (output_buffer) { + flb_free(output_buffer); + output_buffer = NULL; + } + if (src_fp) { + fclose(src_fp); + src_fp = NULL; + } + if (dst_fp) { + fclose(dst_fp); + dst_fp = NULL; + } + + return ret; +} + +/* Function to rotate file */ +static int rotate_file(struct flb_logrotate_conf *ctx, const char *filename, + struct logrotate_file_size *entry) { + char timestamp[32]; + char *rotated_filename = NULL; + char *gzip_filename = NULL; + size_t file_size = 0; + int ret = 0; + + /* Caller must hold entry->lock */ + + rotated_filename = flb_malloc(PATH_MAX); + if (!rotated_filename) { + flb_errno(); + return -1; + } + + if (ctx->gzip == FLB_TRUE) { + gzip_filename = flb_malloc(PATH_MAX); + if (!gzip_filename) { + flb_free(rotated_filename); + flb_errno(); + return -1; + } + } + + file_size = entry->size; + + /* Log rotation event */ + flb_plg_info(ctx->ins, "rotating file: %s (current size: %zu bytes)", + filename, file_size); + + /* Generate timestamp */ + generate_timestamp(timestamp, sizeof(timestamp)); + + /* Create rotated filename with timestamp */ + snprintf(rotated_filename, PATH_MAX - 1, "%s.%s", filename, timestamp); + + /* Rename current file to rotated filename */ + if (rename(filename, rotated_filename) != 0) { + flb_plg_error(ctx->ins, "failed to rename file from %s to %s", filename, + rotated_filename); + ret = -1; + goto cleanup; + } + + /* If gzip is enabled, compress the rotated file */ + if (ctx->gzip == FLB_TRUE) { + snprintf(gzip_filename, PATH_MAX - 1, "%s.gz", rotated_filename); + flb_plg_debug(ctx->ins, "compressing file: %s to %s", rotated_filename, + gzip_filename); + ret = gzip_compress_file(rotated_filename, gzip_filename, ctx->ins); + if (ret == 0) { + /* Remove the uncompressed file */ +#ifdef FLB_SYSTEM_WINDOWS + DeleteFileA(rotated_filename); +#else + unlink(rotated_filename); +#endif + flb_plg_debug(ctx->ins, "rotated and compressed file: %s", gzip_filename); + } else { + /* Remove the failed gzip file */ +#ifdef FLB_SYSTEM_WINDOWS + DeleteFileA(gzip_filename); +#else + unlink(gzip_filename); +#endif + ret = -1; + goto cleanup; + } + } else { + flb_plg_debug(ctx->ins, "rotated file: %s (no compression)", + rotated_filename); + } + + /* Reset file size in the entry since we rotated */ + entry->size = 0; + +cleanup: + if (rotated_filename) { + flb_free(rotated_filename); + } + if (gzip_filename) { + flb_free(gzip_filename); + } + + return ret; +} + +/* + * Function to validate if a filename matches the rotation pattern format + * Valid formats: + * - base_filename.YYYYMMDD_HHMMSS (15 chars after pattern) + * - base_filename.YYYYMMDD_HHMMSS.gz (18 chars after pattern) + */ +static int is_valid_rotation_filename(const char *filename, + const char *pattern) { + size_t pattern_len = strlen(pattern); + size_t filename_len = strlen(filename); + const char *suffix; + size_t suffix_len; + int i; + + /* Check that filename starts with pattern */ + if (strncmp(filename, pattern, pattern_len) != 0) { + return 0; + } + + /* Get the suffix after the pattern */ + suffix = filename + pattern_len; + suffix_len = filename_len - pattern_len; + + /* Must be exactly 15 or 18 characters */ + if (suffix_len != 15 && suffix_len != 18) { + return 0; + } + + /* For 18 characters, must end with .gz */ + if (suffix_len == 18) { + if (strcmp(suffix + 15, ".gz") != 0) { + return 0; + } + } + + /* Validate timestamp format: YYYYMMDD_HHMMSS + * - 8 digits (YYYYMMDD) + * - underscore at position 8 + * - 6 digits (HHMMSS) + */ + for (i = 0; i < 8; i++) { + if (suffix[i] < '0' || suffix[i] > '9') { + return 0; + } + } + if (suffix[8] != '_') { + return 0; + } + for (i = 9; i < 15; i++) { + if (suffix[i] < '0' || suffix[i] > '9') { + return 0; + } + } + + return 1; +} + +/* Function to clean up old rotated files */ +static int cleanup_old_files(struct flb_logrotate_conf *ctx, + const char *directory, const char *base_filename) { + char pattern[PATH_MAX]; + char full_path[PATH_MAX]; +#ifdef FLB_SYSTEM_WINDOWS + char search_path[PATH_MAX]; +#endif + char **files = NULL; + int file_count = 0; + int max_files = ctx->max_files; + int i, j; + + /* Create pattern to match rotated files */ + snprintf(pattern, PATH_MAX - 1, "%s.", base_filename); + +#ifdef FLB_SYSTEM_WINDOWS + HANDLE hFind; + WIN32_FIND_DATA findData; + + /* Create search path: directory\* */ + snprintf(search_path, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "*", directory); + + hFind = FindFirstFileA(search_path, &findData); + if (hFind == INVALID_HANDLE_VALUE) { + return 0; /* Directory doesn't exist or can't be opened */ + } + + /* Count matching files */ + do { + /* Skip . and .. */ + if (strcmp(findData.cFileName, ".") == 0 || + strcmp(findData.cFileName, "..") == 0) { + continue; + } + /* Skip directories */ + if (findData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + continue; + } + if (is_valid_rotation_filename(findData.cFileName, pattern)) { + file_count++; + } + } while (FindNextFileA(hFind, &findData) != 0); + + if (file_count <= max_files) { + FindClose(hFind); + return 0; + } + + /* Allocate array for file names */ + files = flb_calloc(file_count, sizeof(char *)); + if (!files) { + FindClose(hFind); + return -1; + } + + /* Collect file names - restart search */ + FindClose(hFind); + hFind = FindFirstFileA(search_path, &findData); + if (hFind == INVALID_HANDLE_VALUE) { + flb_free(files); + return -1; + } + + i = 0; + do { + /* Skip . and .. */ + if (strcmp(findData.cFileName, ".") == 0 || + strcmp(findData.cFileName, "..") == 0) { + continue; + } + /* Skip directories */ + if (findData.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + continue; + } + if (is_valid_rotation_filename(findData.cFileName, pattern) && + i < file_count) { + snprintf(full_path, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s", directory, + findData.cFileName); + files[i] = flb_strdup(full_path); + i++; + } + } while (FindNextFileA(hFind, &findData) != 0 && i < file_count); + + FindClose(hFind); +#else + DIR *dir; + struct dirent *entry; + + dir = opendir(directory); + if (!dir) { + return 0; /* Directory doesn't exist or can't be opened */ + } + + /* Count matching files */ + while ((entry = readdir(dir)) != NULL) { + if (is_valid_rotation_filename(entry->d_name, pattern)) { + file_count++; + } + } + + if (file_count <= max_files) { + closedir(dir); + return 0; + } + + /* Allocate array for file names */ + files = flb_calloc(file_count, sizeof(char *)); + if (!files) { + closedir(dir); + return -1; + } + + /* Collect file names */ + rewinddir(dir); + i = 0; + while ((entry = readdir(dir)) != NULL && i < file_count) { + if (is_valid_rotation_filename(entry->d_name, pattern)) { + snprintf(full_path, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s", directory, + entry->d_name); + files[i] = flb_strdup(full_path); + i++; + } + } + closedir(dir); +#endif + + /* Sort files by modification time (oldest first) */ + for (i = 0; i < file_count - 1; i++) { + for (j = i + 1; j < file_count; j++) { + struct stat st1; + struct stat st2; + + if (!files[i] || !files[j]) { + continue; + } + + if (stat(files[i], &st1) == 0 && stat(files[j], &st2) == 0) { + if (st1.st_mtime > st2.st_mtime) { + char *temp = files[i]; + files[i] = files[j]; + files[j] = temp; + } + } + } + } + + /* Remove oldest files */ + if (file_count > max_files) { + flb_plg_info( + ctx->ins, + "cleaning up old rotated files: removing %d files (keeping %d)", + file_count - max_files, max_files); + } + for (i = 0; i < file_count - max_files; i++) { + if (files[i]) { +#ifdef FLB_SYSTEM_WINDOWS + if (DeleteFileA(files[i]) != 0) { +#else + if (unlink(files[i]) == 0) { +#endif + flb_plg_debug(ctx->ins, "removed old rotated file: %s", files[i]); + } + flb_free(files[i]); + } + } + + /* Free remaining file names */ + for (i = file_count - max_files; i < file_count; i++) { + if (files[i]) { + flb_free(files[i]); + } + } + + flb_free(files); + return 0; +} + +static void cb_logrotate_flush(struct flb_event_chunk *event_chunk, + struct flb_output_flush *out_flush, + struct flb_input_instance *ins, + void *out_context, struct flb_config *config) { + int ret; + int ret_val = FLB_OK; + int column_names; + FILE *fp; + size_t off = 0; + size_t last_off = 0; + size_t alloc_size = 0; + size_t total; + size_t file_size = 0; + char *out_file = NULL; + char *buf; + char *out_file_copy; + char *directory = NULL; + char *base_filename = NULL; + long file_pos; + bool have_directory; + struct flb_logrotate_conf *ctx = out_context; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + struct logrotate_file_size *entry = NULL; + struct stat st; + + (void)config; + + /* Set the right output file */ + + out_file = flb_malloc(PATH_MAX); + if (!out_file) { + flb_errno(); + FLB_OUTPUT_RETURN(FLB_ERROR); + } + + if (ctx->out_path) { + if (ctx->out_file) { + snprintf(out_file, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s", + ctx->out_path, ctx->out_file); + } else { + snprintf(out_file, PATH_MAX - 1, "%s" FLB_PATH_SEPARATOR "%s", + ctx->out_path, event_chunk->tag); + } + } else { + if (ctx->out_file) { + snprintf(out_file, PATH_MAX - 1, "%s", ctx->out_file); + } else { + snprintf(out_file, PATH_MAX - 1, "%s", event_chunk->tag); + } + } + + /* Find or create file size entry and acquire lock (Hand-Over-Hand) */ + if (flb_lock_acquire(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) != 0) { + ret_val = FLB_ERROR; + goto cleanup; + } + + entry = find_file_size_entry(ctx, out_file); + if (entry == NULL) { + /* Entry doesn't exist yet, create it with initial size 0 */ + entry = create_file_size_entry(ctx, out_file, 0); + if (entry == NULL) { + flb_lock_release(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + ret_val = FLB_ERROR; + goto cleanup; + } + } + + /* Acquire lock before any file operations */ + if (flb_lock_acquire(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY) != 0) { + flb_plg_error(ctx->ins, "failed to acquire lock for file %s", out_file); + flb_lock_release(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + ret_val = FLB_ERROR; + goto cleanup; + } + + /* Release list lock now that we hold the entry lock */ + flb_lock_release(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + + /* Check if file needs rotation based on current size counter */ + file_size = entry->size; + + if (file_size >= ctx->max_size) { + have_directory = false; + + directory = flb_malloc(PATH_MAX); + if (!directory) { + flb_errno(); + ret_val = FLB_ERROR; + goto cleanup; + } + directory[0] = '\0'; + + base_filename = flb_malloc(PATH_MAX); + if (!base_filename) { + flb_errno(); + ret_val = FLB_ERROR; + goto cleanup; + } + + /* Extract directory and base filename for cleanup */ + out_file_copy = flb_strdup(out_file); + if (out_file_copy) { +#ifdef FLB_SYSTEM_WINDOWS + PathRemoveFileSpecA(out_file_copy); + strncpy(directory, out_file_copy, PATH_MAX - 1); + directory[PATH_MAX - 1] = '\0'; +#else + strncpy(directory, dirname(out_file_copy), PATH_MAX - 1); + directory[PATH_MAX - 1] = '\0'; +#endif + flb_free(out_file_copy); + have_directory = true; + } + + /* Get base filename for cleanup */ + { + char *last_sep = strrchr(out_file, FLB_PATH_SEPARATOR[0]); + if (last_sep) { + strncpy(base_filename, last_sep + 1, PATH_MAX - 1); + } else { + strncpy(base_filename, out_file, PATH_MAX - 1); + } + base_filename[PATH_MAX - 1] = '\0'; + } + + /* Rotate the file - passing entry, with lock held */ + if (rotate_file(ctx, out_file, entry) == 0) { + /* Clean up old rotated files */ + if (have_directory) { + cleanup_old_files(ctx, directory, base_filename); + } + } + } + + /* Open output file with default name as the Tag */ + fp = fopen(out_file, "ab+"); + if (ctx->mkdir == FLB_TRUE && fp == NULL && errno == ENOENT) { + out_file_copy = flb_strdup(out_file); + if (out_file_copy) { +#ifdef FLB_SYSTEM_WINDOWS + PathRemoveFileSpecA(out_file_copy); + ret = mkpath(ctx->ins, out_file_copy); +#else + ret = mkpath(ctx->ins, dirname(out_file_copy)); +#endif + flb_free(out_file_copy); + if (ret == 0) { + fp = fopen(out_file, "ab+"); + } + } + } + if (fp == NULL) { + flb_errno(); + flb_plg_error(ctx->ins, "error opening: %s", out_file); + /* Release lock before returning */ + if (entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + ret_val = FLB_ERROR; + goto cleanup; + } + + /* Initialize file size counter if this is a new file (logic moved to unified + * block above) */ + /* If we are here, we are guaranteed to have an entry and hold the lock */ + if (entry->size == 0) { + update_file_size_counter(ctx, out_file, fp); + /* update_file_size_counter cycles locks internally which is bad since we + * hold one. */ + /* Actually, since we already have the entry and the lock, we should just + * update size directly! */ + /* But wait, we opened the file. We should just check fstat. */ + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } + + /* + * Get current file stream position, we gather this in case 'csv' format + * needs to write the column names. + */ + file_pos = ftell(fp); + + /* Check if the event type is metrics, handle the payload differently */ + if (event_chunk->type == FLB_INPUT_METRICS) { + print_metrics_text(ctx->ins, fp, event_chunk->data, event_chunk->size); + /* Update file size counter - we already hold the lock */ + if (entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } + fclose(fp); + /* Release lock before returning */ + if (entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + ret_val = FLB_OK; + goto cleanup; + } + + /* + * Msgpack output format used to create unit tests files, useful for + * Fluent Bit developers. + */ + if (ctx->format == FLB_OUT_LOGROTATE_FMT_MSGPACK) { + off = 0; + total = 0; + + do { + ret = fwrite((char *)event_chunk->data + off, 1, event_chunk->size - off, + fp); + if (ret < 0) { + flb_errno(); + fclose(fp); + /* Release lock before returning */ + if (entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + ret_val = FLB_RETRY; + goto cleanup; + } + total += ret; + } while (total < event_chunk->size); + + /* Update file size counter - we already hold the lock */ + if (entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } + fclose(fp); + /* Release lock before returning */ + if (entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + ret_val = FLB_OK; + goto cleanup; + } + + ret = flb_log_event_decoder_init(&log_decoder, (char *)event_chunk->data, + event_chunk->size); + + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_plg_error(ctx->ins, "Log event decoder initialization error : %d", ret); + + /* Update file size counter before closing - we already hold the lock */ + if (entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } + fclose(fp); + /* Release lock before returning */ + if (entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + ret_val = FLB_ERROR; + goto cleanup; + } + + /* + * Upon flush, for each array, lookup the time and the first field + * of the map to use as a data point. + */ + while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == + FLB_EVENT_DECODER_SUCCESS) { + alloc_size = (off - last_off) + 128; /* JSON is larger than msgpack */ + last_off = off; + + switch (ctx->format) { + case FLB_OUT_LOGROTATE_FMT_JSON: + buf = flb_msgpack_to_json_str(alloc_size, log_event.body, + config->json_escape_unicode); + if (buf) { + fprintf(fp, "%s: [%" PRIu64 ".%09lu, %s]" NEWLINE, event_chunk->tag, + (uint64_t)log_event.timestamp.tm.tv_sec, + log_event.timestamp.tm.tv_nsec, buf); + flb_free(buf); + } else { + /* Update file size counter - we already hold the lock */ + if (entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } + flb_log_event_decoder_destroy(&log_decoder); + fclose(fp); + /* Release lock before returning */ + if (entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + ret_val = FLB_RETRY; + goto cleanup; + } + break; + case FLB_OUT_LOGROTATE_FMT_CSV: + if (ctx->csv_column_names == FLB_TRUE && file_pos == 0) { + column_names = FLB_TRUE; + file_pos = 1; + } else { + column_names = FLB_FALSE; + } + csv_output(fp, column_names, &log_event.timestamp, log_event.body, ctx); + break; + case FLB_OUT_LOGROTATE_FMT_LTSV: + ltsv_output(fp, &log_event.timestamp, log_event.body, ctx); + break; + case FLB_OUT_LOGROTATE_FMT_PLAIN: + plain_output(fp, log_event.body, alloc_size, config->json_escape_unicode); + break; + case FLB_OUT_LOGROTATE_FMT_TEMPLATE: + template_output(fp, &log_event.timestamp, log_event.body, ctx); + break; + } + } + + flb_log_event_decoder_destroy(&log_decoder); + + /* Update file size counter - we already hold the lock */ + if (entry != NULL) { + if (fstat(fileno(fp), &st) == 0 && st.st_size >= 0) { + entry->size = (size_t)st.st_size; + } + } + fclose(fp); + + /* Release lock before returning */ + if (entry != NULL) { + flb_lock_release(&entry->lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + } + + ret_val = FLB_OK; + +cleanup: + if (out_file) { + flb_free(out_file); + } + if (directory) { + flb_free(directory); + } + if (base_filename) { + flb_free(base_filename); + } + FLB_OUTPUT_RETURN(ret_val); +} + +static int cb_logrotate_exit(void *data, struct flb_config *config) { + struct flb_logrotate_conf *ctx = data; + struct mk_list *head; + struct mk_list *tmp; + struct logrotate_file_size *entry; + + if (!ctx) { + return 0; + } + + /* Free all file size entries from linked list */ + /* Free all file size entries from linked list */ + flb_lock_acquire(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + mk_list_foreach_safe(head, tmp, &ctx->file_sizes) { + entry = mk_list_entry(head, struct logrotate_file_size, _head); + mk_list_del(&entry->_head); + /* Destroy mutex before freeing entry */ + flb_lock_destroy(&entry->lock); + if (entry->filename) { + flb_sds_destroy(entry->filename); + } + flb_free(entry); + } + flb_lock_release(&ctx->list_lock, FLB_LOCK_DEFAULT_RETRY_LIMIT, + FLB_LOCK_DEFAULT_RETRY_DELAY); + flb_lock_destroy(&ctx->list_lock); + + flb_free(ctx); + return 0; +} + +/* Configuration properties map */ +static struct flb_config_map config_map[] = { + {FLB_CONFIG_MAP_STR, "path", NULL, 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, out_path), + "Absolute path to store the files. This parameter is optional"}, + + {FLB_CONFIG_MAP_STR, "file", NULL, 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, out_file), + "Name of the target file to write the records. If 'path' is specified, " + "the value is prefixed"}, + + {FLB_CONFIG_MAP_STR, "format", NULL, 0, FLB_FALSE, 0, + "Specify the output data format, the available options are: plain (json), " + "csv, ltsv and template. If no value is set the outgoing data is " + "formatted " + "using the tag and the record in json"}, + + {FLB_CONFIG_MAP_STR, "delimiter", NULL, 0, FLB_FALSE, 0, + "Set a custom delimiter for the records"}, + + {FLB_CONFIG_MAP_STR, "label_delimiter", NULL, 0, FLB_FALSE, 0, + "Set a custom label delimiter, to be used with 'ltsv' format"}, + + {FLB_CONFIG_MAP_STR, "template", "{time} {message}", 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, template), + "Set a custom template format for the data"}, + + {FLB_CONFIG_MAP_BOOL, "csv_column_names", "false", 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, csv_column_names), + "Add column names (keys) in the first line of the target file"}, + + {FLB_CONFIG_MAP_BOOL, "mkdir", "false", 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, mkdir), + "Recursively create output directory if it does not exist. Permissions " + "set to 0755"}, + + {FLB_CONFIG_MAP_SIZE, "max_size", "100000000", 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, max_size), + "Maximum size of file before rotation (default: 100M)"}, + + {FLB_CONFIG_MAP_INT, "max_files", "7", 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, max_files), + "Maximum number of rotated files to keep (default: 7)"}, + + {FLB_CONFIG_MAP_BOOL, "gzip", "true", 0, FLB_TRUE, + offsetof(struct flb_logrotate_conf, gzip), + "Whether to gzip rotated files (default: true)"}, + + /* EOF */ + {0}}; + +struct flb_output_plugin out_logrotate_plugin = { + .name = "logrotate", + .description = "Generate log file with rotation", + .cb_init = cb_logrotate_init, + .cb_flush = cb_logrotate_flush, + .cb_exit = cb_logrotate_exit, + .flags = 0, + .workers = 1, + .event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS, + .config_map = config_map, +}; diff --git a/plugins/out_logrotate/logrotate.h b/plugins/out_logrotate/logrotate.h new file mode 100644 index 00000000000..82d77de732f --- /dev/null +++ b/plugins/out_logrotate/logrotate.h @@ -0,0 +1,32 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_OUT_LOGROTATE +#define FLB_OUT_LOGROTATE + +enum { + FLB_OUT_LOGROTATE_FMT_JSON, + FLB_OUT_LOGROTATE_FMT_CSV, + FLB_OUT_LOGROTATE_FMT_LTSV, + FLB_OUT_LOGROTATE_FMT_PLAIN, + FLB_OUT_LOGROTATE_FMT_MSGPACK, + FLB_OUT_LOGROTATE_FMT_TEMPLATE, +}; + +#endif From 776dd3b9a3e05878bff2d26c8bf41a10ca4c725b Mon Sep 17 00:00:00 2001 From: SagiROosto Date: Wed, 10 Dec 2025 12:35:15 +0200 Subject: [PATCH 2/6] tests: add out_logrotate tests Signed-off-by: SagiROosto --- tests/runtime/CMakeLists.txt | 1 + tests/runtime/out_logrotate.c | 1474 +++++++++++++++++++++++++++++++++ 2 files changed, 1475 insertions(+) create mode 100644 tests/runtime/out_logrotate.c diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index dd76c16faee..91fa07eb21a 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -234,6 +234,7 @@ if(FLB_IN_LIB) # These plugins work only on Linux if(NOT FLB_SYSTEM_WINDOWS) FLB_RT_TEST(FLB_OUT_FILE "out_file.c") + FLB_RT_TEST(FLB_OUT_LOGROTATE "out_logrotate.c") endif() FLB_RT_TEST(FLB_OUT_S3 "out_s3.c") FLB_RT_TEST(FLB_OUT_TD "out_td.c") diff --git a/tests/runtime/out_logrotate.c b/tests/runtime/out_logrotate.c new file mode 100644 index 00000000000..0058537b9fa --- /dev/null +++ b/tests/runtime/out_logrotate.c @@ -0,0 +1,1474 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include "flb_tests_runtime.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifndef FLB_SYSTEM_WINDOWS +#include +#include +#define TEST_MKDIR(path) mkdir(path, 0755) +#define PATH_SEPARATOR "/" +#else +#include +#include +#define TEST_MKDIR(path) _mkdir(path) +#define PATH_SEPARATOR "\\" +/* Windows S_ISDIR compatibility */ +#ifndef S_ISDIR +#define S_ISDIR(mode) (((mode) & S_IFMT) == S_IFDIR) +#endif +#endif + +/* Test data */ +#include "data/common/json_invalid.h" /* JSON_INVALID */ +#include "data/common/json_long.h" /* JSON_LONG */ +#include "data/common/json_small.h" /* JSON_SMALL */ + +/* Test functions */ +void flb_test_logrotate_basic_rotation(void); +void flb_test_logrotate_gzip_compression(void); +void flb_test_logrotate_gzip_compression_exact_chunk(void); +void flb_test_logrotate_max_files_cleanup(void); +void flb_test_logrotate_max_files_validation(void); +void flb_test_logrotate_format_csv(void); +void flb_test_logrotate_format_ltsv(void); +void flb_test_logrotate_format_plain(void); +void flb_test_logrotate_format_msgpack(void); +void flb_test_logrotate_format_template(void); +void flb_test_logrotate_path(void); +void flb_test_logrotate_mkdir(void); +void flb_test_logrotate_delimiter(void); +void flb_test_logrotate_label_delimiter(void); +void flb_test_logrotate_csv_column_names(void); +void flb_test_logrotate_multithreaded(void); + +/* Test list */ +TEST_LIST = {{"basic_rotation", flb_test_logrotate_basic_rotation}, + {"gzip_compression", flb_test_logrotate_gzip_compression}, + {"gzip_compression_exact_chunk", + flb_test_logrotate_gzip_compression_exact_chunk}, + {"max_files_cleanup", flb_test_logrotate_max_files_cleanup}, + {"max_files_validation", flb_test_logrotate_max_files_validation}, + + {"format_csv", flb_test_logrotate_format_csv}, + {"format_ltsv", flb_test_logrotate_format_ltsv}, + {"format_plain", flb_test_logrotate_format_plain}, + {"format_msgpack", flb_test_logrotate_format_msgpack}, + {"format_template", flb_test_logrotate_format_template}, + {"path", flb_test_logrotate_path}, + {"mkdir", flb_test_logrotate_mkdir}, + {"delimiter", flb_test_logrotate_delimiter}, + {"label_delimiter", flb_test_logrotate_label_delimiter}, + {"csv_column_names", flb_test_logrotate_csv_column_names}, + {"multithreaded", flb_test_logrotate_multithreaded}, + {NULL, NULL}}; + +#define TEST_LOGFILE "flb_test_logrotate.log" +#define TEST_LOGPATH "out_logrotate" +#define TEST_TIMEOUT 10 + +/* Helper function to recursively delete directory and all its contents */ +static int recursive_delete_directory(const char *dir_path) { +#ifdef FLB_SYSTEM_WINDOWS + WIN32_FIND_DATAA ffd; + HANDLE hFind = INVALID_HANDLE_VALUE; + char search_path[PATH_MAX]; + char file_path[PATH_MAX]; + int ret = 0; + + if (dir_path == NULL) { + return -1; + } + + /* Create search path: dir_path\* */ + snprintf(search_path, sizeof(search_path), "%s\\*", dir_path); + search_path[sizeof(search_path) - 1] = '\0'; + + hFind = FindFirstFileA(search_path, &ffd); + if (hFind == INVALID_HANDLE_VALUE) { + /* Directory doesn't exist or can't be opened, consider it success */ + return 0; + } + + do { + /* Skip . and .. */ + if (strcmp(ffd.cFileName, ".") == 0 || strcmp(ffd.cFileName, "..") == 0) { + continue; + } + + /* Build full path */ + snprintf(file_path, sizeof(file_path), "%s\\%s", dir_path, ffd.cFileName); + file_path[sizeof(file_path) - 1] = '\0'; + + /* Recursively delete subdirectories */ + if (ffd.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY) { + if (recursive_delete_directory(file_path) != 0) { + ret = -1; + } + } else { + /* Delete file - clear read-only if needed */ + if (ffd.dwFileAttributes & FILE_ATTRIBUTE_READONLY) { + SetFileAttributesA(file_path, + ffd.dwFileAttributes & ~FILE_ATTRIBUTE_READONLY); + } + if (DeleteFileA(file_path) == 0) { + ret = -1; + } + } + } while (FindNextFileA(hFind, &ffd) != 0); + + FindClose(hFind); + + /* Remove the directory itself */ + if (RemoveDirectoryA(dir_path) == 0) { + ret = -1; + } + + return ret; +#else + DIR *dir; + struct dirent *entry; + struct stat statbuf; + char path[PATH_MAX]; + int ret = 0; + + if (dir_path == NULL) { + return -1; + } + + /* Check if directory exists */ + if (stat(dir_path, &statbuf) != 0) { + /* Directory doesn't exist, consider it success */ + return 0; + } + + /* Check if it's actually a directory */ + if (!S_ISDIR(statbuf.st_mode)) { + /* Not a directory, try to remove as file */ + return remove(dir_path); + } + + /* Open directory */ + dir = opendir(dir_path); + if (dir == NULL) { + return -1; + } + + /* Iterate through directory entries */ + while ((entry = readdir(dir)) != NULL) { + /* Skip . and .. */ + if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) { + continue; + } + + /* Build full path */ + snprintf(path, sizeof(path), "%s/%s", dir_path, entry->d_name); + + /* Get file status */ + if (stat(path, &statbuf) != 0) { + continue; + } + + /* Recursively delete subdirectories */ + if (S_ISDIR(statbuf.st_mode)) { + if (recursive_delete_directory(path) != 0) { + ret = -1; + } + } else { + /* Delete file */ + if (unlink(path) != 0) { + ret = -1; + } + } + } + + closedir(dir); + + /* Remove the directory itself */ + if (rmdir(dir_path) != 0) { + ret = -1; + } + + return ret; +#endif +} + +/* Helper function to count files in directory */ +#ifdef FLB_SYSTEM_WINDOWS +static int count_files_in_directory(const char *dir_path, const char *prefix) { + WIN32_FIND_DATAA ffd; + HANDLE hFind = INVALID_HANDLE_VALUE; + char search_path[PATH_MAX]; + int count = 0; + + snprintf(search_path, sizeof(search_path), "%s\\*", dir_path); + hFind = FindFirstFileA(search_path, &ffd); + if (hFind == INVALID_HANDLE_VALUE) { + return -1; + } + + do { + if (strncmp(ffd.cFileName, prefix, strlen(prefix)) == 0) { + count++; + } + } while (FindNextFileA(hFind, &ffd) != 0); + + FindClose(hFind); + return count; +} +#else +static int count_files_in_directory(const char *dir_path, const char *prefix) { + DIR *dir; + struct dirent *entry; + int count = 0; + + dir = opendir(dir_path); + if (dir == NULL) { + return -1; + } + + while ((entry = readdir(dir)) != NULL) { + if (strncmp(entry->d_name, prefix, strlen(prefix)) == 0) { + count++; + } + } + + closedir(dir); + return count; +} +#endif + +/* + * Helper function: Wait for a file matching the pattern "prefix*gz" to appear + * in dir_path + */ +#ifdef FLB_SYSTEM_WINDOWS +static int wait_for_file_pattern(const char *dir_path, const char *prefix, + const char *suffix, int time_limit) { + int elapsed_time, found = 0; + WIN32_FIND_DATAA ffd; + HANDLE hFind = INVALID_HANDLE_VALUE; + char search_path[PATH_MAX]; + size_t prefix_len = strlen(prefix); + size_t suffix_len = strlen(suffix); + + snprintf(search_path, sizeof(search_path), "%s\\*", dir_path); + + for (elapsed_time = 0; elapsed_time < time_limit && !found; elapsed_time++) { + hFind = FindFirstFileA(search_path, &ffd); + if (hFind != INVALID_HANDLE_VALUE) { + do { + if (strncmp(ffd.cFileName, prefix, prefix_len) == 0 && + strlen(ffd.cFileName) > prefix_len + suffix_len && + strcmp(ffd.cFileName + strlen(ffd.cFileName) - suffix_len, + suffix) == 0) { + found = 1; + break; + } + } while (FindNextFileA(hFind, &ffd) != 0); + FindClose(hFind); + } + if (!found) { + flb_time_msleep(1000); + } + } + return found ? 0 : -1; +} +#else +static int wait_for_file_pattern(const char *dir_path, const char *prefix, + const char *suffix, int time_limit) { + int elapsed_time, found = 0; + DIR *dir; + struct dirent *entry; + size_t prefix_len = strlen(prefix); + size_t suffix_len = strlen(suffix); + + for (elapsed_time = 0; elapsed_time < time_limit && !found; elapsed_time++) { + dir = opendir(dir_path); + if (dir) { + while ((entry = readdir(dir)) != NULL) { + if (strncmp(entry->d_name, prefix, prefix_len) == 0 && + strlen(entry->d_name) > prefix_len + suffix_len && + strcmp(entry->d_name + strlen(entry->d_name) - suffix_len, + suffix) == 0) { + found = 1; + break; + } + } + closedir(dir); + } + if (!found) { + flb_time_msleep(1000); + } + } + return found ? 0 : -1; +} +#endif + +/* Helper function to read file content into buffer */ +static char *read_file_content(const char *filename, size_t *out_size) { + FILE *fp; + char *buffer; + struct stat st; + size_t size; + + if (stat(filename, &st) != 0) { + return NULL; + } + + size = st.st_size; + fp = fopen(filename, "rb"); + if (!fp) { + return NULL; + } + + buffer = flb_malloc(size + 1); + if (!buffer) { + fclose(fp); + return NULL; + } + + if (fread(buffer, 1, size, fp) != size) { + flb_free(buffer); + fclose(fp); + return NULL; + } + + buffer[size] = '\0'; + fclose(fp); + *out_size = size; + return buffer; +} + +/* Format Tests */ +void flb_test_logrotate_format_csv(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_csv.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "csv", "max_size", "100M", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify CSV format - should contain commas as delimiters */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* CSV should contain commas */ + TEST_CHECK(strstr(content, ",") != NULL); + /* CSV should contain timestamp */ + TEST_CHECK(strstr(content, "1448403340") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_format_ltsv(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_ltsv.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "ltsv", "max_size", "100M", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify LTSV format - should contain colons (label delimiter) and tabs */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* LTSV should contain colons for label:value pairs */ + TEST_CHECK(strstr(content, ":") != NULL); + /* Should contain "time" label */ + TEST_CHECK(strstr(content, "time") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_format_plain(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_plain.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "plain", "max_size", "100M", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify plain format - should be JSON without tag/timestamp prefix */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Plain format should contain JSON */ + TEST_CHECK(strstr(content, "{") != NULL); + /* Should not contain tag prefix like "test: [" */ + TEST_CHECK(strstr(content, "test: [") == NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_format_msgpack(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[512]; + struct stat st; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_msgpack.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "msgpack", "max_size", "100M", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify msgpack format - should be binary data */ + if (stat(logfile, &st) == 0) { + TEST_CHECK(st.st_size > 0); + /* Msgpack files should not be readable as text (no newlines in first bytes) + */ + fp = fopen(logfile, "rb"); + if (fp) { + unsigned char first_bytes[10]; + size_t read_bytes = fread(first_bytes, 1, 10, fp); + fclose(fp); + if (read_bytes > 0) { + /* + * Msgpack typically starts with array markers (0x91, 0x92, etc.) + * or map markers. Just verify it's not plain text JSON. + */ + TEST_CHECK(first_bytes[0] != '{' && first_bytes[0] != '['); + } + } + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_format_template(void) { + int i; + int ret; + int bytes; + /* Use JSON with specific fields for template testing */ + const char *json_template = + "[1448403340, {\"message\": \"test log entry\", \"level\": \"info\"}]"; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_template.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "template", "template", + "{time} {message}", "max_size", "100M", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = + flb_lib_push(ctx, in_ffd, (char *)json_template, strlen(json_template)); + TEST_CHECK(bytes == strlen(json_template)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify template format - should contain substituted values */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Template should contain the message value */ + TEST_CHECK(strstr(content, "test log entry") != NULL); + /* Should contain timestamp (as float) */ + TEST_CHECK(strstr(content, "1448403340") != NULL || + strstr(content, ".") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +/* Configuration Option Tests */ +void flb_test_logrotate_path(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[PATH_MAX]; + char test_path[PATH_MAX]; + + snprintf(test_path, sizeof(test_path), "%s" PATH_SEPARATOR "path_test", + TEST_LOGPATH); + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); +/* Construct logfile path - test_path is short so this is safe */ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wformat-truncation" + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "path_test.log", + test_path); +#pragma GCC diagnostic pop + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "path", test_path, + "file", "path_test.log", "mkdir", "true", + "max_size", "100M", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify file was created in the specified path */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp) { + fclose(fp); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_mkdir(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[PATH_MAX]; + char nested_path[PATH_MAX]; + struct stat st; + + snprintf(nested_path, sizeof(nested_path), + "%s" PATH_SEPARATOR "nested" PATH_SEPARATOR "deep" PATH_SEPARATOR + "path", + TEST_LOGPATH); +/* Construct logfile path - nested_path is short so this is safe */ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wformat-truncation" + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "test_mkdir.log", + nested_path); +#pragma GCC diagnostic pop + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "mkdir", "true", "max_size", "100M", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify nested directory was created */ + TEST_CHECK(stat(nested_path, &st) == 0); + TEST_CHECK(S_ISDIR(st.st_mode)); + + /* Verify file was created */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp) { + fclose(fp); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_delimiter(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_delimiter.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "csv", "delimiter", "tab", "max_size", + "100M", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify tab delimiter is used (should contain tabs, not commas) */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Should contain tab characters */ + int has_tab = 0; + int j; + for (j = 0; j < content_size; j++) { + if (content[j] == '\t') { + has_tab = 1; + break; + } + } + TEST_CHECK(has_tab); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_label_delimiter(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_label_delimiter.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "ltsv", "label_delimiter", "comma", + "max_size", "100M", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify custom label delimiter is used */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Should contain "," as label delimiter (comma) */ + TEST_CHECK(strstr(content, ",") != NULL); + /* Should contain "time" label with comma delimiter */ + /* LTSV format prints "time" (with quotes) followed by delimiter */ + TEST_CHECK(strstr(content, "\"time\",") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_csv_column_names(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *content; + size_t content_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_csv_columns.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "csv", "csv_column_names", "true", + "max_size", "100M", "gzip", "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write some data */ + for (i = 0; i < 3; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify CSV column names header exists */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* First line should contain "timestamp" */ + TEST_CHECK(strstr(content, "timestamp") != NULL); + /* Should contain key names from JSON */ + TEST_CHECK(strstr(content, "key_0") != NULL); + flb_free(content); + } + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +/* Multithreaded Test */ +struct thread_data { + flb_ctx_t *ctx; + int in_ffd; + int thread_id; + int events_per_thread; + char *json_data; + size_t json_len; + int *success; + pthread_mutex_t *mutex; +}; + +static void *thread_worker(void *arg) { + struct thread_data *data = (struct thread_data *)arg; + int i; + int bytes; + + for (i = 0; i < data->events_per_thread; i++) { + bytes = + flb_lib_push(data->ctx, data->in_ffd, data->json_data, data->json_len); + if (bytes != (int)data->json_len) { + pthread_mutex_lock(data->mutex); + *data->success = 0; + pthread_mutex_unlock(data->mutex); + return NULL; + } + /* Small delay to allow interleaving */ + flb_time_msleep(10); + } + + return NULL; +} + +void flb_test_logrotate_multithreaded(void) { + int ret; + int i; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + pthread_t threads[8]; + struct thread_data thread_data[8]; + pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + int success = 1; + int num_threads = 4; + int events_per_thread = 10; + FILE *fp; + char *content; + size_t content_size; + int line_count = 0; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + "test_multithreaded.log"); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "0.5", "Grace", "2", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "max_size", "1M", "max_files", "5", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Prepare thread data */ + for (i = 0; i < num_threads; i++) { + thread_data[i].ctx = ctx; + thread_data[i].in_ffd = in_ffd; + thread_data[i].thread_id = i; + thread_data[i].events_per_thread = events_per_thread; + thread_data[i].json_data = p; + thread_data[i].json_len = strlen(p); + thread_data[i].success = &success; + thread_data[i].mutex = &mutex; + } + + /* Create and start threads */ + for (i = 0; i < num_threads; i++) { + ret = pthread_create(&threads[i], NULL, thread_worker, &thread_data[i]); + TEST_CHECK(ret == 0); + } + + /* Wait for all threads to complete */ + for (i = 0; i < num_threads; i++) { + pthread_join(threads[i], NULL); + } + + /* Wait for flush to complete - allow multiple flush cycles */ + flb_time_msleep(3000); + + /* Wait for file to exist and have content before stopping */ + ret = wait_for_file(logfile, 1000, TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + flb_stop(ctx); + flb_destroy(ctx); + + /* Verify all data was written correctly */ + TEST_CHECK(success == 1); + + /* Verify file exists and has content */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp) { + char line[4096]; + while (fgets(line, sizeof(line), fp) != NULL) { + line_count++; + } + fclose(fp); + } + + /* Should have at least num_threads * events_per_thread records */ + /* (may be more due to JSON format adding tag prefix) */ + TEST_CHECK(line_count >= num_threads * events_per_thread); + + /* Verify file content is valid - read and check for expected data */ + content = read_file_content(logfile, &content_size); + TEST_CHECK(content != NULL); + if (content) { + /* Should contain tag */ + TEST_CHECK(strstr(content, "test") != NULL); + /* Should contain timestamp */ + TEST_CHECK(strstr(content, "1448403340") != NULL); + /* Count occurrences of key_0 to verify records */ + int key_count = 0; + char *pos = content; + while ((pos = strstr(pos, "key_0")) != NULL) { + key_count++; + pos++; + } + TEST_CHECK(key_count >= num_threads * events_per_thread); + flb_free(content); + } + + pthread_mutex_destroy(&mutex); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_basic_rotation(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + FILE *fp; + char logfile[512]; + time_t now = time(NULL); + struct tm *tm_info = localtime(&now); + char timestamp[32]; + + strftime(timestamp, sizeof(timestamp), "%Y%m%d_%H%M%S", tm_info); + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "max_size", "5K", "max_files", "3", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write enough data to fill the file (JSON_SMALL is ~4KB, 4 events = ~16KB) + */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + /* Wait for file to be created */ + ret = wait_for_file(logfile, 10 * 1024, TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Wait a bit more to ensure flush completes and file size is updated */ + flb_time_msleep(1500); + + /* Write additional data to trigger rotation (4 more events = ~16KB more) */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); /* waiting flush */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that the original file exists */ + fp = fopen(logfile, "r"); + TEST_CHECK(fp != NULL); + if (fp != NULL) { + fclose(fp); + } + + /* Check that at least one rotated file exists: "flb_test_logrotate.log.*" */ + TEST_CHECK( + count_files_in_directory(TEST_LOGPATH, "flb_test_logrotate.log.") >= 1); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_gzip_compression(void) { + int i; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "max_size", "5K", "max_files", "3", "gzip", "true", + NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write enough data to for rotation to happen (JSON_SMALL is ~4KB) */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + /* Wait for file to be created */ + ret = wait_for_file(logfile, 10 * 1024, TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Write enough data to trigger rotation (JSON_SMALL is ~4KB) */ + for (i = 0; i < 4; i++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(2000); /* waiting flush and rotation/compression */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that a gzipped rotated file exists: "flb_test_logrotate.log.*.gz" */ + ret = wait_for_file_pattern(TEST_LOGPATH, "flb_test_logrotate.log.", ".gz", + TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_max_files_cleanup(void) { + int i, j; + int ret; + int bytes; + char *p = (char *)JSON_SMALL; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + int file_count; + char logfile[512]; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "max_size", "5K", "max_files", "3", "gzip", + "false", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Write enough data to trigger multiple rotations */ + for (i = 0; i < 5; i++) { /* Write ~5MB to trigger multiple rotations */ + /* Write enough data to for rotation to happen (JSON_SMALL is ~4KB) */ + for (j = 0; j < 4; j++) { + bytes = flb_lib_push(ctx, in_ffd, p, strlen(p)); + TEST_CHECK(bytes == strlen(p)); + } + + flb_time_msleep(1500); /* waiting flush */ + + file_count = count_files_in_directory(TEST_LOGPATH, TEST_LOGFILE); + TEST_ASSERT(file_count >= 0); + TEST_CHECK(file_count <= 4); + } + + flb_time_msleep(1500); /* waiting flush */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that only Max_Files + 1 files exist (current + rotated) */ + file_count = count_files_in_directory(TEST_LOGPATH, TEST_LOGFILE); + TEST_ASSERT(file_count >= 0); + TEST_CHECK(file_count <= + 4); /* Current file + 3 rotated files (max_files=3) */ + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_max_files_validation(void) { + flb_ctx_t *ctx; + int out_ffd; + char logfile[512]; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "off", NULL) == 0); + + /* Test with max_files = 0 */ + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "max_files", "0", NULL) == 0); + + /* Start should fail */ + TEST_CHECK(flb_start(ctx) == -1); + + flb_destroy(ctx); + + /* Test with max_files = -1 */ + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "off", NULL) == 0); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "max_files", "-1", NULL) == 0); + + /* Start should fail */ + TEST_CHECK(flb_start(ctx) == -1); + + flb_destroy(ctx); + + /* Clean up directory */ + recursive_delete_directory(TEST_LOGPATH); +} + +void flb_test_logrotate_gzip_compression_exact_chunk(void) { + int ret; + int bytes; + flb_ctx_t *ctx; + int in_ffd; + int out_ffd; + char logfile[512]; + char *large_message; + char *json_payload; + size_t msg_size = 64 * 1024; /* 64KB exact chunk size */ + size_t json_size; + + /* Clean up any existing directory and contents */ + recursive_delete_directory(TEST_LOGPATH); + TEST_MKDIR(TEST_LOGPATH); + snprintf(logfile, sizeof(logfile), "%s" PATH_SEPARATOR "%s", TEST_LOGPATH, + TEST_LOGFILE); + + ctx = flb_create(); + TEST_ASSERT(flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", + "error", NULL) == 0); + + in_ffd = flb_input(ctx, (char *)"lib", NULL); + TEST_CHECK(in_ffd >= 0); + flb_input_set(ctx, in_ffd, "tag", "test", NULL); + + out_ffd = flb_output(ctx, (char *)"logrotate", NULL); + TEST_CHECK(out_ffd >= 0); + TEST_ASSERT(flb_output_set(ctx, out_ffd, "match", "test", "file", logfile, + "format", "template", "template", "{message}", + "max_size", "64K", "max_files", "3", "gzip", + "true", NULL) == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + /* Prepare 64KB message */ + large_message = flb_malloc(msg_size + 1); + TEST_CHECK(large_message != NULL); + memset(large_message, 'A', msg_size); + large_message[msg_size] = '\0'; + + /* Create JSON payload: [timestamp, {"message": "..."}] */ + /* Estimate size: msg_size + overhead */ + json_size = msg_size + 100; + json_payload = flb_malloc(json_size); + TEST_CHECK(json_payload != NULL); + + snprintf(json_payload, json_size, "[%lu, {\"message\": \"%s\"}]", time(NULL), + large_message); + + /* Write exactly 64KB of data (the message content) */ + bytes = flb_lib_push(ctx, in_ffd, json_payload, strlen(json_payload)); + TEST_CHECK(bytes == strlen(json_payload)); + + flb_free(large_message); + flb_free(json_payload); + + /* Wait for flush and file creation */ + flb_time_msleep(1500); + + /* Trigger rotation by writing one more small record */ + char *small_payload = "[1234567890, {\"message\": \"trigger\"}]"; + bytes = flb_lib_push(ctx, in_ffd, small_payload, strlen(small_payload)); + TEST_CHECK(bytes == strlen(small_payload)); + + flb_time_msleep(2000); /* waiting flush and rotation/compression */ + + flb_stop(ctx); + flb_destroy(ctx); + + /* Check that a gzipped rotated file exists */ + ret = wait_for_file_pattern(TEST_LOGPATH, "flb_test_logrotate.log.", ".gz", + TEST_TIMEOUT); + TEST_CHECK(ret == 0); + + /* Clean up directory and all contents */ + recursive_delete_directory(TEST_LOGPATH); +} \ No newline at end of file From 716e1d5f0226bd8b66081186c36ee2fe331fe9f0 Mon Sep 17 00:00:00 2001 From: SagiROosto Date: Thu, 14 Aug 2025 16:35:16 +0300 Subject: [PATCH 3/6] aws_credentials: add IoT provider support and related credentials definitions This update introduces the IoT provider creation function and adds necessary environment variable definitions for IoT credentials in the AWS module. Additionally, the CMake configuration is updated to include the new IoT credentials source file. Signed-off-by: Eduardo Silva Signed-off-by: SagiROosto --- include/fluent-bit/flb_aws_credentials.h | 5 + src/aws/CMakeLists.txt | 1 + src/aws/flb_aws_credentials.c | 21 + src/aws/flb_aws_credentials_iot.c | 651 +++++++++++++++++++++++ 4 files changed, 678 insertions(+) create mode 100644 src/aws/flb_aws_credentials_iot.c diff --git a/include/fluent-bit/flb_aws_credentials.h b/include/fluent-bit/flb_aws_credentials.h index 36652ead45a..e4eebb0627f 100644 --- a/include/fluent-bit/flb_aws_credentials.h +++ b/include/fluent-bit/flb_aws_credentials.h @@ -225,6 +225,11 @@ struct flb_aws_provider *flb_eks_provider_create(struct flb_config *config, flb_aws_client_generator *generator); +/* + * IoT Provider + */ +struct flb_aws_provider *flb_iot_provider_create(struct flb_config *config, + struct flb_aws_client_generator *generator); /* * STS Assume Role Provider. diff --git a/src/aws/CMakeLists.txt b/src/aws/CMakeLists.txt index 941e811b633..de7d491cb47 100644 --- a/src/aws/CMakeLists.txt +++ b/src/aws/CMakeLists.txt @@ -15,6 +15,7 @@ set(src "flb_aws_imds.c" "flb_aws_credentials_http.c" "flb_aws_credentials_profile.c" + "flb_aws_credentials_iot.c" ) message(STATUS "=== AWS Credentials ===") diff --git a/src/aws/flb_aws_credentials.c b/src/aws/flb_aws_credentials.c index 75f13b111f2..c30a1c12ab1 100644 --- a/src/aws/flb_aws_credentials.c +++ b/src/aws/flb_aws_credentials.c @@ -38,6 +38,14 @@ #define EKS_POD_EXECUTION_ROLE "EKS_POD_EXECUTION_ROLE" +/* IoT Credentials Environment Variables */ +#define AWS_IOT_KEY_FILE "AWS_IOT_KEY_FILE" +#define AWS_IOT_CERT_FILE "AWS_IOT_CERT_FILE" +#define AWS_IOT_CA_CERT_FILE "AWS_IOT_CA_CERT_FILE" +#define AWS_IOT_CREDENTIALS_ENDPOINT "AWS_IOT_CREDENTIALS_ENDPOINT" +#define AWS_IOT_THING_NAME "AWS_IOT_THING_NAME" +#define AWS_IOT_ROLE_ALIAS "AWS_IOT_ROLE_ALIAS" + /* declarations */ static struct flb_aws_provider *standard_chain_create(struct flb_config *config, @@ -51,6 +59,10 @@ static struct flb_aws_provider *standard_chain_create(struct flb_config int eks_irsa, char *profile); +/* IoT Provider declaration */ +struct flb_aws_provider *flb_iot_provider_create(struct flb_config *config, + struct flb_aws_client_generator *generator); + /* * The standard credential provider chain: @@ -59,6 +71,7 @@ static struct flb_aws_provider *standard_chain_create(struct flb_config * 3. EKS OIDC * 4. EC2 IMDS * 5. ECS HTTP credentials endpoint + * 6. IoT credentials endpoint * * This provider will evaluate each provider in order, returning the result * from the first provider that returns valid credentials. @@ -566,6 +579,14 @@ static struct flb_aws_provider *standard_chain_create(struct flb_config mk_list_add(&sub_provider->_head, &implementation->sub_providers); + /* IoT Provider - check early since it requires specific environment variables */ + sub_provider = flb_iot_provider_create(config, generator); + if (sub_provider) { + /* IoT provider can fail if we are not running in IoT */ + mk_list_add(&sub_provider->_head, &implementation->sub_providers); + flb_debug("[aws_credentials] Initialized IoT Provider in standard chain"); + } + flb_debug("[aws_credentials] creating profile %s provider", profile); sub_provider = flb_profile_provider_create(profile); if (sub_provider) { diff --git a/src/aws/flb_aws_credentials_iot.c b/src/aws/flb_aws_credentials_iot.c new file mode 100644 index 00000000000..a94be0a0a3a --- /dev/null +++ b/src/aws/flb_aws_credentials_iot.c @@ -0,0 +1,651 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2024 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +/* IoT Credentials Environment Variables */ +#define AWS_IOT_KEY_FILE "AWS_IOT_KEY_FILE" +#define AWS_IOT_CERT_FILE "AWS_IOT_CERT_FILE" +#define AWS_IOT_CA_CERT_FILE "AWS_IOT_CA_CERT_FILE" +#define AWS_IOT_CREDENTIALS_ENDPOINT "AWS_IOT_CREDENTIALS_ENDPOINT" +#define AWS_IOT_THING_NAME "AWS_IOT_THING_NAME" +#define AWS_IOT_ROLE_ALIAS "AWS_IOT_ROLE_ALIAS" + +/* IoT Provider */ +struct flb_aws_provider_iot { + struct flb_aws_credentials *creds; + time_t next_refresh; + + struct flb_aws_client *client; + + /* IoT specific configuration */ + char *key_file; + char *cert_file; + char *ca_cert_file; + char *credentials_endpoint; + char *thing_name; + char *role_alias; + + /* TLS configuration for IoT certificates */ + struct flb_tls *tls; + + /* Static header for thing name */ + struct flb_aws_header thing_name_header; +}; + +/* Forward declarations */ +static int iot_credentials_request(struct flb_aws_provider_iot *implementation); +static struct flb_aws_credentials *flb_parse_iot_credentials(char *response, size_t response_len, time_t *expiration); + +struct flb_aws_credentials *get_credentials_fn_iot(struct flb_aws_provider *provider) +{ + struct flb_aws_credentials *creds = NULL; + int refresh = FLB_FALSE; + struct flb_aws_provider_iot *implementation = provider->implementation; + + flb_debug("[aws_credentials] Requesting credentials from the " + "IoT provider.."); + + /* a negative next_refresh means that auto-refresh is disabled */ + if (implementation->next_refresh > 0 + && time(NULL) > implementation->next_refresh) { + refresh = FLB_TRUE; + } + if (!implementation->creds || refresh == FLB_TRUE) { + if (try_lock_provider(provider)) { + flb_debug("[aws_credentials] IoT Provider: Refreshing credential " + "cache."); + iot_credentials_request(implementation); + unlock_provider(provider); + } + } + + if (!implementation->creds) { + /* + * We failed to lock the provider and creds are unset. This means that + * another co-routine is performing the refresh. + */ + flb_warn("[aws_credentials] No cached credentials are available and " + "a credential refresh is already in progress. The current " + "co-routine will retry."); + + return NULL; + } + + creds = flb_calloc(1, sizeof(struct flb_aws_credentials)); + if (!creds) { + goto error; + } + + creds->access_key_id = flb_sds_create(implementation->creds->access_key_id); + if (!creds->access_key_id) { + goto error; + } + + creds->secret_access_key = flb_sds_create(implementation->creds-> + secret_access_key); + if (!creds->secret_access_key) { + goto error; + } + + if (implementation->creds->session_token) { + creds->session_token = flb_sds_create(implementation->creds-> + session_token); + if (!creds->session_token) { + goto error; + } + } else { + creds->session_token = NULL; + } + + return creds; + +error: + flb_errno(); + flb_aws_credentials_destroy(creds); + return NULL; +} + +int refresh_fn_iot(struct flb_aws_provider *provider) { + int ret = -1; + struct flb_aws_provider_iot *implementation = provider->implementation; + + flb_debug("[aws_credentials] Refresh called on the IoT provider"); + + if (try_lock_provider(provider)) { + ret = iot_credentials_request(implementation); + unlock_provider(provider); + } + return ret; +} + +int init_fn_iot(struct flb_aws_provider *provider) { + int ret = -1; + struct flb_aws_provider_iot *implementation = provider->implementation; + + flb_debug("[aws_credentials] Init called on the IoT provider"); + + implementation->client->debug_only = FLB_TRUE; + + if (try_lock_provider(provider)) { + ret = iot_credentials_request(implementation); + unlock_provider(provider); + } + + implementation->client->debug_only = FLB_FALSE; + return ret; +} + +void sync_fn_iot(struct flb_aws_provider *provider) { + struct flb_aws_provider_iot *implementation = provider->implementation; + + flb_debug("[aws_credentials] Sync called on the IoT provider"); + /* Remove async flag */ + flb_stream_disable_async_mode(&implementation->client->upstream->base); +} + +void async_fn_iot(struct flb_aws_provider *provider) { + struct flb_aws_provider_iot *implementation = provider->implementation; + + flb_debug("[aws_credentials] Async called on the IoT provider"); + /* Add async flag */ + flb_stream_enable_async_mode(&implementation->client->upstream->base); +} + +void upstream_set_fn_iot(struct flb_aws_provider *provider, + struct flb_output_instance *ins) { + struct flb_aws_provider_iot *implementation = provider->implementation; + + flb_debug("[aws_credentials] upstream_set called on the IoT provider"); + /* Associate output and upstream */ + flb_output_upstream_set(implementation->client->upstream, ins); +} + +void destroy_fn_iot(struct flb_aws_provider *provider) { + struct flb_aws_provider_iot *implementation = provider->implementation; + + if (implementation) { + if (implementation->creds) { + flb_aws_credentials_destroy(implementation->creds); + } + + if (implementation->client) { + flb_aws_client_destroy(implementation->client); + } + + if (implementation->tls) { + flb_tls_destroy(implementation->tls); + } + + if (implementation->key_file) { + flb_free(implementation->key_file); + } + if (implementation->cert_file) { + flb_free(implementation->cert_file); + } + if (implementation->ca_cert_file) { + flb_free(implementation->ca_cert_file); + } + if (implementation->credentials_endpoint) { + flb_free(implementation->credentials_endpoint); + } + if (implementation->thing_name) { + flb_free(implementation->thing_name); + } + if (implementation->role_alias) { + flb_free(implementation->role_alias); + } + + flb_free(implementation); + provider->implementation = NULL; + } + + return; +} + +static struct flb_aws_provider_vtable iot_provider_vtable = { + .get_credentials = get_credentials_fn_iot, + .init = init_fn_iot, + .refresh = refresh_fn_iot, + .destroy = destroy_fn_iot, + .sync = sync_fn_iot, + .async = async_fn_iot, + .upstream_set = upstream_set_fn_iot, +}; + +struct flb_aws_provider *flb_iot_provider_create(struct flb_config *config, + struct flb_aws_client_generator *generator) +{ + struct flb_aws_provider_iot *implementation = NULL; + struct flb_aws_provider *provider = NULL; + struct flb_upstream *upstream = NULL; + char *endpoint_path = NULL; + flb_sds_t protocol = NULL; + flb_sds_t host = NULL; + flb_sds_t port_sds = NULL; + int port = 443; + int ret; + + /* Check if IoT environment variables are set */ + char *key_file = getenv(AWS_IOT_KEY_FILE); + char *cert_file = getenv(AWS_IOT_CERT_FILE); + char *ca_cert_file = getenv(AWS_IOT_CA_CERT_FILE); + char *credentials_endpoint = getenv(AWS_IOT_CREDENTIALS_ENDPOINT); + char *thing_name = getenv(AWS_IOT_THING_NAME); + char *role_alias = getenv(AWS_IOT_ROLE_ALIAS); + + if (!key_file || !cert_file || !ca_cert_file || !credentials_endpoint || + !thing_name || !role_alias) { + flb_debug("[aws_credentials] Not initializing IoT provider because " + "required environment variables are not set"); + return NULL; + } + + provider = flb_calloc(1, sizeof(struct flb_aws_provider)); + if (!provider) { + flb_errno(); + return NULL; + } + + pthread_mutex_init(&provider->lock, NULL); + + implementation = flb_calloc(1, sizeof(struct flb_aws_provider_iot)); + if (!implementation) { + flb_free(provider); + flb_errno(); + return NULL; + } + + provider->provider_vtable = &iot_provider_vtable; + provider->implementation = implementation; + + /* Store IoT configuration */ + implementation->key_file = flb_strdup(key_file); + implementation->cert_file = flb_strdup(cert_file); + implementation->ca_cert_file = flb_strdup(ca_cert_file); + implementation->credentials_endpoint = flb_strdup(credentials_endpoint); + implementation->thing_name = flb_strdup(thing_name); + implementation->role_alias = flb_strdup(role_alias); + + /* Parse the credentials endpoint URL */ + ret = flb_utils_url_split_sds(credentials_endpoint, &protocol, &host, &port_sds, &endpoint_path); + if (ret < 0) { + flb_error("[aws_credentials] Invalid IoT credentials endpoint URL: %s", credentials_endpoint); + goto error; + } + + if (port_sds != NULL) { + port = atoi(port_sds); + if (port == 0) { + flb_error("[aws_credentials] Invalid port in IoT credentials endpoint: %s", port_sds); + goto error; + } + } + + /* Create TLS configuration for IoT certificates */ + flb_debug("[aws_credentials] Creating TLS instance with cert: %s, key: %s, ca: %s", + implementation->cert_file, implementation->key_file, implementation->ca_cert_file); + + implementation->tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + FLB_TRUE, /* debug - enable TLS debug */ + NULL, /* vhost */ + NULL, /* ca_path */ + implementation->ca_cert_file, + implementation->cert_file, + implementation->key_file, + NULL); /* key_passwd */ + if (!implementation->tls) { + flb_error("[aws_credentials] Failed to create TLS instance for IoT Provider"); + goto error; + } + + flb_debug("[aws_credentials] TLS instance created successfully"); + + /* Create upstream connection */ + flb_debug("[aws_credentials] Creating upstream connection to %s:%d", host, port); + upstream = flb_upstream_create(config, host, port, FLB_IO_TLS, implementation->tls); + if (!upstream) { + flb_error("[aws_credentials] IoT Provider: connection initialization error"); + goto error; + } + + flb_debug("[aws_credentials] Upstream connection created successfully"); + + upstream->base.net.connect_timeout = FLB_AWS_CREDENTIAL_NET_TIMEOUT; + + implementation->client = generator->create(); + if (!implementation->client) { + flb_aws_provider_destroy(provider); + flb_upstream_destroy(upstream); + flb_error("[aws_credentials] IoT Provider: client creation error"); + return NULL; + } + + implementation->client->name = "iot_provider_client"; + implementation->client->has_auth = FLB_FALSE; + implementation->client->provider = NULL; + implementation->client->region = NULL; + implementation->client->service = NULL; + implementation->client->port = port; + implementation->client->flags = 0; + implementation->client->proxy = NULL; + implementation->client->upstream = upstream; + + flb_debug("[aws_credentials] IoT client configured: name=%s, port=%d, has_auth=%d", + implementation->client->name, implementation->client->port, implementation->client->has_auth); + + /* Set up the thing name header */ + implementation->thing_name_header.key = "x-amzn-iot-thingname"; + implementation->thing_name_header.key_len = 22; + implementation->thing_name_header.val = implementation->thing_name; + implementation->thing_name_header.val_len = strlen(implementation->thing_name); + + flb_debug("[aws_credentials] Setting IoT thing name header: %s = %s", + implementation->thing_name_header.key, implementation->thing_name_header.val); + + /* Set the static headers for the client */ + implementation->client->static_headers = &implementation->thing_name_header; + implementation->client->static_headers_len = 1; + + /* Clean up temporary variables */ + flb_sds_destroy(protocol); + flb_sds_destroy(host); + flb_sds_destroy(port_sds); + flb_sds_destroy(endpoint_path); + + return provider; + +error: + flb_aws_provider_destroy(provider); + flb_sds_destroy(protocol); + flb_sds_destroy(host); + flb_sds_destroy(port_sds); + flb_sds_destroy(endpoint_path); + return NULL; +} + +static int iot_credentials_request(struct flb_aws_provider_iot *implementation) +{ + struct flb_aws_credentials *creds = NULL; + struct flb_http_client *c = NULL; + time_t expiration; + flb_sds_t uri = NULL; + int ret; + + flb_debug("[aws_credentials] Calling IoT credentials endpoint.."); + + /* Construct the URI for the IoT credentials request */ + uri = flb_sds_create_size(256); + if (!uri) { + flb_errno(); + return -1; + } + + uri = flb_sds_printf(&uri, "/role-aliases/%s/credentials", implementation->role_alias); + if (!uri) { + return -1; + } + + /* Make the HTTP request */ + flb_debug("[aws_credentials] Making IoT credentials request to: %s", uri); + flb_debug("[aws_credentials] Client headers count: %d", implementation->client->static_headers_len); + if (implementation->client->static_headers_len > 0) { + flb_debug("[aws_credentials] Client header: %s = %s", + implementation->client->static_headers[0].key, + implementation->client->static_headers[0].val); + } + + c = implementation->client->client_vtable->request(implementation->client, FLB_HTTP_GET, + uri, NULL, 0, NULL, 0); + + flb_sds_destroy(uri); + + if (!c) { + flb_error("[aws_credentials] IoT credentials request failed - no response"); + return -1; + } + + flb_debug("[aws_credentials] IoT credentials response status: %d", c->resp.status); + flb_debug("[aws_credentials] IoT credentials response size: %zu", c->resp.payload_size); + + if (c->resp.status != 200) { + flb_error("[aws_credentials] IoT credentials request failed with status: %d", c->resp.status); + if (c->resp.payload_size > 0) { + flb_aws_print_error_code(c->resp.payload, c->resp.payload_size, + "IoTCredentialsProvider"); + } + flb_http_client_destroy(c); + return -1; + } + + /* Debug: Log the actual response from IoT credentials endpoint */ + flb_debug("[aws_credentials] IoT credentials response (size: %zu): %.*s", + c->resp.payload_size, (int)c->resp.payload_size, c->resp.payload); + + /* Parse the credentials response - IoT endpoint may have different format */ + creds = flb_parse_iot_credentials(c->resp.payload, c->resp.payload_size, &expiration); + if (!creds) { + flb_debug("[aws_credentials] Failed to parse IoT credentials response"); + flb_http_client_destroy(c); + return -1; + } + + /* Destroy existing credentials */ + flb_aws_credentials_destroy(implementation->creds); + implementation->creds = NULL; + + implementation->creds = creds; + implementation->next_refresh = expiration - FLB_AWS_REFRESH_WINDOW; + flb_http_client_destroy(c); + + return 0; +} + +/* + * Parse IoT credentials response. + * AWS IoT credentials endpoint returns a JSON response with credentials. + * The format may be different from standard AWS credentials endpoints. + */ +static struct flb_aws_credentials *flb_parse_iot_credentials(char *response, size_t response_len, time_t *expiration) +{ + jsmntok_t *tokens = NULL; + const jsmntok_t *t = NULL; + char *current_token = NULL; + jsmn_parser parser; + int tokens_size = 50; + size_t size; + int ret; + struct flb_aws_credentials *creds = NULL; + int i = 0; + int len; + flb_sds_t tmp; + + /* + * Remove/reset existing value of expiration. + * Expiration should be in the response, but it is not + * strictly speaking needed. Fluent Bit logs a warning if it is missing. + */ + *expiration = -1; + + jsmn_init(&parser); + + size = sizeof(jsmntok_t) * tokens_size; + tokens = flb_calloc(1, size); + if (!tokens) { + goto error; + } + + ret = jsmn_parse(&parser, response, response_len, tokens, tokens_size); + + if (ret == JSMN_ERROR_INVAL || ret == JSMN_ERROR_PART) { + flb_error("[aws_credentials] Could not parse IoT credentials response - invalid JSON."); + goto error; + } + + /* Shouldn't happen, but just in case, check for too many tokens error */ + if (ret == JSMN_ERROR_NOMEM) { + flb_error("[aws_credentials] Could not parse IoT credentials response - response contained more tokens than expected."); + goto error; + } + + /* return value is number of tokens parsed */ + tokens_size = ret; + + creds = flb_calloc(1, sizeof(struct flb_aws_credentials)); + if (!creds) { + flb_errno(); + goto error; + } + + /* + * jsmn will create an array of tokens like: + * key, value, key, value + * For IoT credentials, the structure is: + * {"credentials": {"accessKeyId": "...", "secretAccessKey": "...", ...}} + */ + while (i < (tokens_size - 1)) { + t = &tokens[i]; + + if (t->start == -1 || t->end == -1 || (t->start == 0 && t->end == 0)) { + break; + } + + if (t->type == JSMN_STRING) { + current_token = &response[t->start]; + len = t->end - t->start; + + /* Check for credentials wrapper object */ + if (strncmp(current_token, "credentials", len) == 0) { + /* Skip the credentials object - we'll process its contents */ + i++; + continue; + } + + /* Check for AccessKeyId field (case insensitive) */ + if (strncmp(current_token, "accessKeyId", len) == 0 || + strncmp(current_token, "AccessKeyId", len) == 0) { + i++; + t = &tokens[i]; + current_token = &response[t->start]; + len = t->end - t->start; + if (creds->access_key_id != NULL) { + flb_error("Trying to double allocate access_key_id"); + goto error; + } + creds->access_key_id = flb_sds_create_len(current_token, len); + if (!creds->access_key_id) { + flb_errno(); + goto error; + } + continue; + } + /* Check for SecretAccessKey field (case insensitive) */ + if (strncmp(current_token, "secretAccessKey", len) == 0 || + strncmp(current_token, "SecretAccessKey", len) == 0) { + i++; + t = &tokens[i]; + current_token = &response[t->start]; + len = t->end - t->start; + if (creds->secret_access_key != NULL) { + flb_error("Trying to double allocate secret_access_key"); + goto error; + } + creds->secret_access_key = flb_sds_create_len(current_token, len); + if (!creds->secret_access_key) { + flb_errno(); + goto error; + } + continue; + } + /* Check for Token field (session token) - case insensitive */ + if (strncmp(current_token, "sessionToken", len) == 0 || + strncmp(current_token, "Token", len) == 0) { + i++; + t = &tokens[i]; + current_token = &response[t->start]; + len = t->end - t->start; + if (creds->session_token != NULL) { + flb_error("Trying to double allocate session_token"); + goto error; + } + creds->session_token = flb_sds_create_len(current_token, len); + if (!creds->session_token) { + flb_errno(); + goto error; + } + continue; + } + /* Check for Expiration field (case insensitive) */ + if (strncmp(current_token, "expiration", len) == 0 || + strncmp(current_token, "Expiration", len) == 0) { + i++; + t = &tokens[i]; + current_token = &response[t->start]; + len = t->end - t->start; + tmp = flb_sds_create_len(current_token, len); + if (!tmp) { + flb_errno(); + goto error; + } + *expiration = flb_aws_cred_expiration(tmp); + if (*expiration < 0) { + flb_warn("[aws_credentials] '%s' was invalid or could not be parsed. Disabling auto-refresh of credentials.", tmp); + } + flb_sds_destroy(tmp); + } + } + + i++; + } + + if (creds->access_key_id == NULL) { + flb_error("[aws_credentials] Missing AccessKeyId field in IoT credentials response"); + goto error; + } + + if (creds->secret_access_key == NULL) { + flb_error("[aws_credentials] Missing SecretAccessKey field in IoT credentials response"); + goto error; + } + + flb_debug("[aws_credentials] Successfully parsed IoT credentials - AccessKeyId: %s, Expiration: %ld", + creds->access_key_id, *expiration); + + flb_free(tokens); + return creds; + +error: + flb_aws_credentials_destroy(creds); + flb_free(tokens); + return NULL; +} \ No newline at end of file From 0d71e191dc46e80d1fe7dbac795ea20b87c0d45a Mon Sep 17 00:00:00 2001 From: SagiROosto Date: Mon, 18 Aug 2025 10:34:55 +0300 Subject: [PATCH 4/6] aws_credentials: fix CR comments Signed-off-by: SagiROosto --- include/fluent-bit/flb_aws_credentials.h | 8 +++++ src/aws/flb_aws_credentials.c | 13 -------- src/aws/flb_aws_credentials_iot.c | 40 +++++++++--------------- 3 files changed, 23 insertions(+), 38 deletions(-) diff --git a/include/fluent-bit/flb_aws_credentials.h b/include/fluent-bit/flb_aws_credentials.h index e4eebb0627f..cacaa492ad4 100644 --- a/include/fluent-bit/flb_aws_credentials.h +++ b/include/fluent-bit/flb_aws_credentials.h @@ -34,6 +34,14 @@ /* 5 second timeout for credential related http requests */ #define FLB_AWS_CREDENTIAL_NET_TIMEOUT 5 +/* IoT Credentials Environment Variables */ +#define AWS_IOT_KEY_FILE "AWS_IOT_KEY_FILE" +#define AWS_IOT_CERT_FILE "AWS_IOT_CERT_FILE" +#define AWS_IOT_CA_CERT_FILE "AWS_IOT_CA_CERT_FILE" +#define AWS_IOT_CREDENTIALS_ENDPOINT "AWS_IOT_CREDENTIALS_ENDPOINT" +#define AWS_IOT_THING_NAME "AWS_IOT_THING_NAME" +#define AWS_IOT_ROLE_ALIAS "AWS_IOT_ROLE_ALIAS" + /* * A structure that wraps the sensitive data needed to sign an AWS request */ diff --git a/src/aws/flb_aws_credentials.c b/src/aws/flb_aws_credentials.c index c30a1c12ab1..94e2515279a 100644 --- a/src/aws/flb_aws_credentials.c +++ b/src/aws/flb_aws_credentials.c @@ -38,14 +38,6 @@ #define EKS_POD_EXECUTION_ROLE "EKS_POD_EXECUTION_ROLE" -/* IoT Credentials Environment Variables */ -#define AWS_IOT_KEY_FILE "AWS_IOT_KEY_FILE" -#define AWS_IOT_CERT_FILE "AWS_IOT_CERT_FILE" -#define AWS_IOT_CA_CERT_FILE "AWS_IOT_CA_CERT_FILE" -#define AWS_IOT_CREDENTIALS_ENDPOINT "AWS_IOT_CREDENTIALS_ENDPOINT" -#define AWS_IOT_THING_NAME "AWS_IOT_THING_NAME" -#define AWS_IOT_ROLE_ALIAS "AWS_IOT_ROLE_ALIAS" - /* declarations */ static struct flb_aws_provider *standard_chain_create(struct flb_config *config, @@ -59,11 +51,6 @@ static struct flb_aws_provider *standard_chain_create(struct flb_config int eks_irsa, char *profile); -/* IoT Provider declaration */ -struct flb_aws_provider *flb_iot_provider_create(struct flb_config *config, - struct flb_aws_client_generator *generator); - - /* * The standard credential provider chain: * 1. Environment variables diff --git a/src/aws/flb_aws_credentials_iot.c b/src/aws/flb_aws_credentials_iot.c index a94be0a0a3a..988021d35b9 100644 --- a/src/aws/flb_aws_credentials_iot.c +++ b/src/aws/flb_aws_credentials_iot.c @@ -25,6 +25,8 @@ #include #include #include +#include +#include #include #include @@ -32,14 +34,6 @@ #include #include -/* IoT Credentials Environment Variables */ -#define AWS_IOT_KEY_FILE "AWS_IOT_KEY_FILE" -#define AWS_IOT_CERT_FILE "AWS_IOT_CERT_FILE" -#define AWS_IOT_CA_CERT_FILE "AWS_IOT_CA_CERT_FILE" -#define AWS_IOT_CREDENTIALS_ENDPOINT "AWS_IOT_CREDENTIALS_ENDPOINT" -#define AWS_IOT_THING_NAME "AWS_IOT_THING_NAME" -#define AWS_IOT_ROLE_ALIAS "AWS_IOT_ROLE_ALIAS" - /* IoT Provider */ struct flb_aws_provider_iot { struct flb_aws_credentials *creds; @@ -377,22 +371,18 @@ struct flb_aws_provider *flb_iot_provider_create(struct flb_config *config, implementation->client->static_headers = &implementation->thing_name_header; implementation->client->static_headers_len = 1; - /* Clean up temporary variables */ - flb_sds_destroy(protocol); - flb_sds_destroy(host); - flb_sds_destroy(port_sds); - flb_sds_destroy(endpoint_path); - - return provider; - -error: - flb_aws_provider_destroy(provider); - flb_sds_destroy(protocol); - flb_sds_destroy(host); - flb_sds_destroy(port_sds); - flb_sds_destroy(endpoint_path); - return NULL; -} + goto cleanup; // At the end of the func + error: + flb_aws_provider_destroy(provider); + provider = NULL; + // if no return it just keep executing :) + cleanup: + flb_sds_destroy(protocol); + flb_sds_destroy(host); + flb_sds_destroy(port_sds); + flb_sds_destroy(endpoint_path); + return provider; + } static int iot_credentials_request(struct flb_aws_provider_iot *implementation) { @@ -648,4 +638,4 @@ static struct flb_aws_credentials *flb_parse_iot_credentials(char *response, siz flb_aws_credentials_destroy(creds); flb_free(tokens); return NULL; -} \ No newline at end of file +} From 6f8d43e37797fa00ec9f68d19fa0be0820332c92 Mon Sep 17 00:00:00 2001 From: SagiROosto Date: Wed, 20 Aug 2025 15:22:02 +0300 Subject: [PATCH 5/6] aws_credentials: fix copilot comments Signed-off-by: SagiROosto --- src/aws/flb_aws_credentials_iot.c | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/src/aws/flb_aws_credentials_iot.c b/src/aws/flb_aws_credentials_iot.c index 988021d35b9..1fe805371d9 100644 --- a/src/aws/flb_aws_credentials_iot.c +++ b/src/aws/flb_aws_credentials_iot.c @@ -371,18 +371,17 @@ struct flb_aws_provider *flb_iot_provider_create(struct flb_config *config, implementation->client->static_headers = &implementation->thing_name_header; implementation->client->static_headers_len = 1; - goto cleanup; // At the end of the func - error: - flb_aws_provider_destroy(provider); - provider = NULL; - // if no return it just keep executing :) cleanup: flb_sds_destroy(protocol); flb_sds_destroy(host); flb_sds_destroy(port_sds); flb_sds_destroy(endpoint_path); return provider; - } + error: + flb_aws_provider_destroy(provider); + provider = NULL; + goto cleanup; +} static int iot_credentials_request(struct flb_aws_provider_iot *implementation) { @@ -549,7 +548,7 @@ static struct flb_aws_credentials *flb_parse_iot_credentials(char *response, siz current_token = &response[t->start]; len = t->end - t->start; if (creds->access_key_id != NULL) { - flb_error("Trying to double allocate access_key_id"); + flb_error("[aws_credentials] Trying to double allocate access_key_id"); goto error; } creds->access_key_id = flb_sds_create_len(current_token, len); @@ -567,7 +566,7 @@ static struct flb_aws_credentials *flb_parse_iot_credentials(char *response, siz current_token = &response[t->start]; len = t->end - t->start; if (creds->secret_access_key != NULL) { - flb_error("Trying to double allocate secret_access_key"); + flb_error("[aws_credentials] Trying to double allocate secret_access_key"); goto error; } creds->secret_access_key = flb_sds_create_len(current_token, len); @@ -585,7 +584,7 @@ static struct flb_aws_credentials *flb_parse_iot_credentials(char *response, siz current_token = &response[t->start]; len = t->end - t->start; if (creds->session_token != NULL) { - flb_error("Trying to double allocate session_token"); + flb_error("[aws_credentials] Trying to double allocate session_token"); goto error; } creds->session_token = flb_sds_create_len(current_token, len); From 5910692c2e4fe5ccc0bd41fa61cd71fb4700bd79 Mon Sep 17 00:00:00 2001 From: SagiROosto Date: Thu, 21 Aug 2025 10:46:53 +0300 Subject: [PATCH 6/6] aws_credentials: add https to endpoint if no scheme supplied Signed-off-by: SagiROosto --- src/aws/flb_aws_credentials_iot.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/aws/flb_aws_credentials_iot.c b/src/aws/flb_aws_credentials_iot.c index 1fe805371d9..0f27f688bef 100644 --- a/src/aws/flb_aws_credentials_iot.c +++ b/src/aws/flb_aws_credentials_iot.c @@ -290,6 +290,21 @@ struct flb_aws_provider *flb_iot_provider_create(struct flb_config *config, implementation->thing_name = flb_strdup(thing_name); implementation->role_alias = flb_strdup(role_alias); + /* Ensure credentials_endpoint has http or https scheme, default to https:// if missing */ + if (strncmp(credentials_endpoint, "http://", 7) != 0 && + strncmp(credentials_endpoint, "https://", 8) != 0) { + flb_sds_t tmp = flb_sds_create_size(strlen(credentials_endpoint) + 8 + 1); + if (!tmp) { + flb_error("[aws_credentials] Failed to allocate memory for credentials_endpoint"); + goto error; + } + flb_sds_cat(tmp, "https://", 8); + flb_sds_cat(tmp, credentials_endpoint, strlen(credentials_endpoint)); + flb_free(implementation->credentials_endpoint); + implementation->credentials_endpoint = tmp; + credentials_endpoint = implementation->credentials_endpoint; + } + /* Parse the credentials endpoint URL */ ret = flb_utils_url_split_sds(credentials_endpoint, &protocol, &host, &port_sds, &endpoint_path); if (ret < 0) {