-
Notifications
You must be signed in to change notification settings - Fork 19
WIP: File Receiver #272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: File Receiver #272
Conversation
…rsing, LogRecord generation to spawn_blocking, batch and send LogRecords
mheffner
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a lot here obviously, only did a cursory scan with some higher level questions. This may be a good one to have some high-level arch diagrams in the docs, maybe claude could generate? Happy to continue looking at specific portions, but I imagine lots of testing will help here.
One thought of a possible future improvement would be isolation of the components by unique device IDs monitored. The scenario I was thinking of was if you had files monitored across two attached EBS disks, if one were to lock up/block, would you want to continue operating on the other disk? 🤷 Definitely an optimization for the future, but just a thought I had looking.
Good stuff!
| self.process_active_file(path); | ||
| } | ||
|
|
||
| // Mark files as rotated if they're no longer at a glob-matching path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could a file be rotated, but still match a glob-matching path?
mheffner
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments here, a lot we discussed already though. I'm good with getting this in and continual testing as we expand use cases.
As to the generated benchmarks, I'd be more inclined to remove them to reduce size of repo. I find that stuff can get stale overtime, so may be better to track in different repos (esp since some is go).
src/receivers/file/receiver.rs
Outdated
| // Main event loop - flume channels support async recv directly, no bridge needed | ||
| loop { | ||
| select! { | ||
| _ = cancel.cancelled() => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly consider adding some bias to this select?
src/receivers/file/receiver.rs
Outdated
| } | ||
|
|
||
| // Process completed workers | ||
| Some(result) = worker_futures.next(), if !worker_futures.is_empty() => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if moving this arm above the previous one would, with a biased select, help clean up the old workers before spawning a new worker for the same file?
src/receivers/file/receiver.rs
Outdated
|
|
||
| let payload_msg = payload::Message::new(None, vec![resource_logs], None); | ||
|
|
||
| match logs_output.send(payload_msg).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could block here and prevent the other select arms from running?
| severity_number, | ||
| severity_text, | ||
| body: Some(AnyValue { | ||
| value: Some(any_value::Value::StringValue(line)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Todo comment for later as to whether to replace the body with something else from the parsed result, probably fine for nginx logs to keep the same.
| base64_decode_impl(s.as_bytes()) | ||
| } | ||
|
|
||
| // Simple base64 implementation to avoid adding another dependency |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we already have base64 as a dep?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I will remove this in a follow up PR 👍
👍 I've removed the generated code. |
….rs timeout for receivers shutdown to 3 seconds
Adds a new File Receiver to Rotel that reads and tails log files and converts them to OpenTelemetry logs. This receiver enables Rotel to ingest logs directly from files (like nginx access logs) without requiring a separate log shipper.
Key capabilities:
Design Overview
The receiver uses a coordinator/worker architecture:
The coordinator runs on a single OS thread and maintains exclusive ownership of file state. Workers handle blocking file I/O via tokio::spawn_blocking. The design ensures no duplicate work items per file are running concurrently at any time and provides backpressure through bounded channels.
Files Changed
Core receiver implementation (~5,800 lines):
Testing & benchmarking:
Documentation: