-
Notifications
You must be signed in to change notification settings - Fork 0
add preaggregation pipeline dual shipping support #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
WalkthroughThis update introduces a preaggregation pipeline feature throughout the metrics serialization and forwarding system. It adds new configuration options for preaggregation endpoints and API keys, modifies endpoint selection logic, and generalizes series payload marshaling to support multiple configurable pipelines. The changes refactor payload creation and submission to accommodate flexible filtering and routing, including preaggregation-specific handling. Changes
Sequence Diagram(s)sequenceDiagram
participant Config as Config
participant Serializer as Serializer
participant Series as IterableSeries
participant Forwarder as DefaultForwarder
participant Endpoint as EndpointResolver
Config->>Serializer: Provide settings (preaggr enabled, URLs, keys)
Serializer->>Series: MarshalSplitCompressPipelines(pipelines)
Series-->>Serializer: Payloads (with destinations)
Serializer->>Forwarder: Submit payloads
Forwarder->>Endpoint: Resolve endpoint for each payload
Endpoint-->>Forwarder: Return endpoint (preaggr or standard)
Forwarder->>Endpoint: Send payload to resolved endpoint
Poem
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (1.64.8)Error: build linters: plugin(pkgconfigusage): plugin "pkgconfigusage" not found ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
| payloads := make([]*transaction.BytesPayload, len(pbs)) | ||
| for _, pb := range pbs { | ||
| payloads = append(payloads, pb.payloads...) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an issue with the array initialization in this code. The line payloads := make([]*transaction.BytesPayload, len(pbs)) creates an array with len(pbs) nil elements, but then the code appends more elements with append(payloads, pb.payloads...). This will result in the first len(pbs) elements being nil followed by the actual payload elements.
To fix this, either:
- Initialize with zero length:
payloads := make([]*transaction.BytesPayload, 0)or simplyvar payloads []*transaction.BytesPayload - Or use assignment instead of append if you want to pre-allocate:
payloads := make([]*transaction.BytesPayload, 0, totalPayloadCount)
for _, pb := range pbs {
payloads = append(payloads, pb.payloads...)
}| payloads := make([]*transaction.BytesPayload, len(pbs)) | |
| for _, pb := range pbs { | |
| payloads = append(payloads, pb.payloads...) | |
| } | |
| payloads := make([]*transaction.BytesPayload, 0) | |
| for _, pb := range pbs { | |
| payloads = append(payloads, pb.payloads...) | |
| } |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
|
|
||
| func setupPreaggregation(config pkgconfigmodel.Setup) { | ||
| config.BindEnvAndSetDefault("enable_preaggr_pipeline", false) | ||
| config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datad0g.com") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The URL https://api.datad0g.com appears to contain a typo, using the digit 0 instead of the letter o. Unless this is intentional (perhaps for testing purposes), consider changing it to https://api.datadoghq.com or the appropriate production URL to avoid potential connectivity issues.
| config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datad0g.com") | |
| config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datadoghq.com") |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Nitpick comments (1)
pkg/serializer/internal/metrics/iterable_series.go (1)
119-133: Improve error handling and resource management in pipeline initialization.The initialization logic has some areas for improvement:
Resource cleanup: If an error occurs after some PayloadsBuilders are created, the successfully created ones are not cleaned up.
Pre-allocation: The slice initialization could benefit from proper capacity allocation.
Consider improving resource management:
-pbs := make([]*PayloadsBuilder, len(pipelines)) +pbs := make([]*PayloadsBuilder, 0, len(pipelines)) for i := range pbs { bufferContext := marshaler.NewBufferContext() pb, err := series.NewPayloadsBuilder(bufferContext, config, strategy) if err != nil { + // Clean up previously created payload builders + for j := 0; j < i; j++ { + // Add cleanup if PayloadsBuilder has resources to clean + } return nil, err } - pbs[i] = &pb + pbs = append(pbs, &pb) err = pbs[i].startPayload() if err != nil { + // Clean up all payload builders including the current one + for j := 0; j <= i; j++ { + // Add cleanup if PayloadsBuilder has resources to clean + } return nil, err } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
comp/forwarder/defaultforwarder/default_forwarder.go(1 hunks)comp/forwarder/defaultforwarder/endpoints/endpoints.go(1 hunks)comp/forwarder/defaultforwarder/transaction/transaction.go(2 hunks)pkg/config/setup/config.go(1 hunks)pkg/config/setup/preaggregation.go(1 hunks)pkg/config/utils/endpoints.go(1 hunks)pkg/serializer/internal/metrics/iterable_series.go(2 hunks)pkg/serializer/serializer.go(2 hunks)pkg/serializer/series_benchmark_test.go(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
comp/forwarder/defaultforwarder/default_forwarder.go (2)
comp/forwarder/defaultforwarder/transaction/transaction.go (2)
Destination(198-198)PreaggrOnly(210-210)comp/forwarder/defaultforwarder/endpoints/endpoints.go (1)
PreaggrSeriesEndpoint(29-29)
pkg/serializer/serializer.go (3)
pkg/serializer/internal/metrics/iterable_series.go (1)
Pipeline(109-112)comp/forwarder/defaultforwarder/transaction/transaction.go (6)
Destination(198-198)PrimaryOnly(204-204)SecondaryOnly(206-206)LocalOnly(208-208)AllRegions(202-202)PreaggrOnly(210-210)comp/metadata/host/hostimpl/hosttags/tags.go (1)
Get(109-203)
pkg/serializer/internal/metrics/iterable_series.go (2)
pkg/metrics/series.go (1)
Serie(49-63)comp/forwarder/defaultforwarder/transaction/transaction.go (1)
Destination(198-198)
🔇 Additional comments (8)
pkg/config/setup/config.go (1)
236-236:Details
✅ Verification successful
Verify that setupPreaggregation function is implemented.
The addition of
setupPreaggregationto the serverless config components follows the correct pattern and aligns with the PR objectives. However, please ensure that thesetupPreaggregationfunction is properly implemented.
🏁 Script executed:
#!/bin/bash # Description: Verify the setupPreaggregation function exists and is properly implemented # Expected: Function should exist in pkg/config/setup/preaggregation.go or similar location # Search for the setupPreaggregation function definition rg -A 10 "func setupPreaggregation" # Search for preaggregation-related configuration keys mentioned in the AI summary rg -B 2 -A 2 "enable_preaggr_pipeline|preaggr_dd_url|preaggr_api_key"Length of output: 3291
setupPreaggregation function verified and properly implemented
The
setupPreaggregationfunction is present inpkg/config/setup/preaggregation.goand correctly binds the required settings:
enable_preaggr_pipeline(default false)preaggr_dd_url(default "https://api.datad0g.com")preaggr_api_keyAll downstream usage in
pkg/serializer/serializer.goandpkg/config/utils/endpoints.goreferences these keys as expected. No further changes needed—LGTM.comp/forwarder/defaultforwarder/endpoints/endpoints.go (1)
28-29: LGTM! Clean endpoint addition.The new preaggregation endpoint follows the established pattern and naming conventions. The route and name are well-defined and consistent with the existing codebase.
comp/forwarder/defaultforwarder/transaction/transaction.go (2)
209-210: LGTM! Proper enum extension.The new
PreaggrOnlydestination constant is correctly added to theDestinationtype, following the established pattern.
223-224: LGTM! String method properly updated.The
String()method is correctly updated to handle the newPreaggrOnlycase, maintaining consistency with the existing implementation pattern.pkg/serializer/series_benchmark_test.go (1)
82-87: LGTM! Benchmark properly updated for new pipeline API.The benchmark correctly adapts to the new pipeline-based marshaling approach. The pipeline configuration with a universal filter and
AllRegionsdestination maintains the original benchmark intent while using the new flexible interface.pkg/serializer/serializer.go (1)
350-360: LGTM! Simplified V1 API handling improves readability.The V1 API handling logic is now much cleaner and easier to follow. The error handling and early return pattern is well-implemented.
pkg/serializer/internal/metrics/iterable_series.go (2)
109-112: LGTM! Well-designed Pipeline abstraction.The Pipeline struct provides a clean abstraction for filtering and routing metrics. The use of function types for FilterFunc makes it flexible and reusable.
137-146:Details
❓ Verification inconclusive
Verify pipeline filtering performance for high-volume scenarios.
The nested loop structure filters each series through all pipelines, which could be a performance concern for high-volume metrics scenarios.
Let's check if there are performance benchmarks for this code path:
The current approach is reasonable for the expected number of pipelines (typically 2-4), but consider adding performance monitoring if pipeline counts grow significantly.
🏁 Script executed:
#!/bin/bash # Search for existing benchmarks related to series marshaling fd -e go -t f | xargs grep -l "Benchmark.*[Ss]eries\|[Ss]eries.*Benchmark" # Check for performance-related configuration or metrics rg -A 5 -B 5 "serializer.*performance\|performance.*serializer"Length of output: 300
Verify nested pipeline filtering performance
I didn’t find a dedicated benchmark or performance tuning for the nested loop in pkg/serializer/internal/metrics/iterable_series.go (lines 137–146). While pipelines typically number 2–4, please:
- Confirm that pkg/serializer/series_benchmark_test.go exercises this filtering logic across multiple pipelines.
- If it doesn’t, add a benchmark that runs the series through realistic pipeline counts (e.g. 4–8) to measure overhead.
- Consider instrumenting runtime metrics or caching strategies if pipeline counts may grow significantly.
| func setupPreaggregation(config pkgconfigmodel.Setup) { | ||
| config.BindEnvAndSetDefault("enable_preaggr_pipeline", false) | ||
| config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datad0g.com") | ||
| config.BindEnv("preaggr_api_key") | ||
| } No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Good configuration setup, but verify the default URL.
The configuration function properly sets up environment variable bindings with appropriate defaults. The approach of defaulting enable_preaggr_pipeline to false for safety is good practice.
However, please verify the default URL "https://api.datad0g.com" - the domain contains "datad0g" with a zero instead of "datadog". Ensure this is intentional and not a typo.
🏁 Script executed:
#!/bin/bash
# Description: Verify if the preaggregation URL domain is valid and related to Datadog
# Check if the domain resolves and is accessible
echo "Checking domain resolution for api.datad0g.com..."
nslookup api.datad0g.com
echo -e "\nChecking HTTP response..."
curl -I https://api.datad0g.com 2>/dev/null || echo "Failed to connect"
echo -e "\nFor comparison, checking standard Datadog domains..."
nslookup api.datadoghq.com
curl -I https://api.datadoghq.com 2>/dev/null || echo "Failed to connect"Length of output: 1587
Correct default Datadog API URL
It looks like the domain in the default URL is a typo—api.datad0g.com (with a zero) isn’t the standard Datadog API endpoint. Please update it to the correct domain:
• pkg/config/setup/preaggregation.go, line 14
- config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datad0g.com")
+ config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datadoghq.com")This ensures clients will hit the official Datadog API.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func setupPreaggregation(config pkgconfigmodel.Setup) { | |
| config.BindEnvAndSetDefault("enable_preaggr_pipeline", false) | |
| config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datad0g.com") | |
| config.BindEnv("preaggr_api_key") | |
| } | |
| func setupPreaggregation(config pkgconfigmodel.Setup) { | |
| config.BindEnvAndSetDefault("enable_preaggr_pipeline", false) | |
| - config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datad0g.com") | |
| + config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datadoghq.com") | |
| config.BindEnv("preaggr_api_key") | |
| } |
🤖 Prompt for AI Agents
In pkg/config/setup/preaggregation.go around lines 12 to 16, the default URL for
the preaggregation API is set to "https://api.datad0g.com" which contains a typo
with a zero instead of the letter "o". Update the default URL string to
"https://api.datadoghq.com" to ensure it points to the correct official Datadog
API endpoint.
| // Populate preaggregation endpoint (only if unique) | ||
| // | ||
| // TODO(?): This assumes we wouldn't have a unique preaggregation API key | ||
| // without a unique preaggregation URL, but we should validate that. | ||
| if c.GetBool("enable_preaggr_pipeline") { | ||
| preaggURL := c.GetString("preaggr_dd_url") | ||
| // Check that it's not the same as the primary URL | ||
| if preaggURL != "" && preaggURL != ddURL { | ||
| // Check if preaggregation URL already exists in additional endpoints | ||
| if _, exists := additionalEndpoints[preaggURL]; !exists { | ||
| // Unique URL - create new domain resolver with preaggregation API key | ||
| additionalEndpoints[preaggURL] = []APIKeys{{ | ||
| ConfigSettingPath: "preaggr_api_key", | ||
| Keys: []string{c.GetString("preaggr_api_key")}, | ||
| }} | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Review the preaggregation endpoint logic and address edge cases.
The preaggregation endpoint configuration logic is generally sound, but there are several concerns:
-
Missing validation: The TODO comment on lines 171-172 highlights a valid concern - there's no validation that a unique preaggregation API key corresponds to a unique URL.
-
Empty API key handling: No validation ensures
preaggr_api_keyis non-empty before creating the endpoint entry. -
URL validation: While the main URL gets validated via
url.Parse()in themergeAdditionalEndpointscall, the preaggregation URL only gets a basic string comparison check.
Consider adding validation to address these concerns:
if c.GetBool("enable_preaggr_pipeline") {
preaggURL := c.GetString("preaggr_dd_url")
+ preaggAPIKey := c.GetString("preaggr_api_key")
+
+ // Validate preaggregation URL
+ if preaggURL != "" {
+ if _, err := url.Parse(preaggURL); err != nil {
+ return nil, fmt.Errorf("could not parse preaggr_dd_url: %s", err)
+ }
+ }
+
// Check that it's not the same as the primary URL
- if preaggURL != "" && preaggURL != ddURL {
+ if preaggURL != "" && preaggURL != ddURL && preaggAPIKey != "" {
// Check if preaggregation URL already exists in additional endpoints
if _, exists := additionalEndpoints[preaggURL]; !exists {
// Unique URL - create new domain resolver with preaggregation API key
additionalEndpoints[preaggURL] = []APIKeys{{
ConfigSettingPath: "preaggr_api_key",
- Keys: []string{c.GetString("preaggr_api_key")},
+ Keys: []string{preaggAPIKey},
}}
}
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Populate preaggregation endpoint (only if unique) | |
| // | |
| // TODO(?): This assumes we wouldn't have a unique preaggregation API key | |
| // without a unique preaggregation URL, but we should validate that. | |
| if c.GetBool("enable_preaggr_pipeline") { | |
| preaggURL := c.GetString("preaggr_dd_url") | |
| // Check that it's not the same as the primary URL | |
| if preaggURL != "" && preaggURL != ddURL { | |
| // Check if preaggregation URL already exists in additional endpoints | |
| if _, exists := additionalEndpoints[preaggURL]; !exists { | |
| // Unique URL - create new domain resolver with preaggregation API key | |
| additionalEndpoints[preaggURL] = []APIKeys{{ | |
| ConfigSettingPath: "preaggr_api_key", | |
| Keys: []string{c.GetString("preaggr_api_key")}, | |
| }} | |
| } | |
| } | |
| } | |
| // Populate preaggregation endpoint (only if unique) | |
| // | |
| // TODO(?): This assumes we wouldn't have a unique preaggregation API key | |
| // without a unique preaggregation URL, but we should validate that. | |
| if c.GetBool("enable_preaggr_pipeline") { | |
| preaggURL := c.GetString("preaggr_dd_url") | |
| preaggAPIKey := c.GetString("preaggr_api_key") | |
| // Validate preaggregation URL | |
| if preaggURL != "" { | |
| if _, err := url.Parse(preaggURL); err != nil { | |
| return nil, fmt.Errorf("could not parse preaggr_dd_url: %s", err) | |
| } | |
| } | |
| // Check that it's not the same as the primary URL | |
| if preaggURL != "" && preaggURL != ddURL && preaggAPIKey != "" { | |
| // Check if preaggregation URL already exists in additional endpoints | |
| if _, exists := additionalEndpoints[preaggURL]; !exists { | |
| // Unique URL - create new domain resolver with preaggregation API key | |
| additionalEndpoints[preaggURL] = []APIKeys{{ | |
| ConfigSettingPath: "preaggr_api_key", | |
| Keys: []string{preaggAPIKey}, | |
| }} | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
In pkg/config/utils/endpoints.go around lines 169 to 186, enhance the
preaggregation endpoint logic by adding validation to ensure the preaggregation
API key is non-empty before creating the endpoint entry, verify that the
preaggregation API key corresponds to a unique preaggregation URL, and perform
proper URL validation on the preaggregation URL instead of just string
comparison. Implement these checks before adding the preaggregation endpoint to
additionalEndpoints to handle edge cases and improve robustness.
|
|
||
| // If the payload has a preaggr destination, but the current domain | ||
| // is not the configured preaggr site, do not create a transaction. | ||
| // | ||
| // If we have a preaggr payload and domain, switch from the standard | ||
| // series endpoint to the preaggr-specific series endpoint. | ||
| if payload.Destination == transaction.PreaggrOnly { | ||
| if drDomain != f.config.GetString("preaggr_dd_url") { | ||
| continue | ||
| } else { | ||
| endpoint = endpoints.PreaggrSeriesEndpoint | ||
| } | ||
| } | ||
| // TODO(?): If the preaggr_dd_url is the same as the primary dd_url, | ||
| // we will also inherit any additional API keys from the | ||
| // configuration of that site, meaning we'll send preaggr payloads | ||
| // for each of those orgs. Not sure if this is a problem or not. | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Address potential duplicate payload sending and improve domain validation.
The PreaggrOnly destination handling logic is correct but has important concerns that need addressing:
-
Duplicate sending issue: The TODO comment on lines 518-521 highlights a real problem - if
preaggr_dd_urlequals the primarydd_url, preaggregated payloads will be sent multiple times for each org due to inherited API keys. -
Configuration access: The direct config access with
f.config.GetString("preaggr_dd_url")should ideally be cached or validated during forwarder initialization rather than on every transaction creation for performance.
Let's verify if this duplicate sending scenario is possible in the current configuration setup:
Consider adding validation to prevent the duplicate sending scenario:
if payload.Destination == transaction.PreaggrOnly {
+ preaggURL := f.config.GetString("preaggr_dd_url")
+ if preaggURL == "" {
+ f.log.Warnf("PreaggrOnly payload skipped: preaggr_dd_url not configured")
+ continue
+ }
- if drDomain != f.config.GetString("preaggr_dd_url") {
+ if drDomain != preaggURL {
continue
} else {
+ // Warn if preaggregation URL is same as primary to highlight potential duplicate sends
+ if preaggURL == f.config.GetString("dd_url") {
+ f.log.Warnf("preaggr_dd_url is same as dd_url - potential duplicate payload sending")
+ }
endpoint = endpoints.PreaggrSeriesEndpoint
}
}🏁 Script executed:
#!/bin/bash
# Check if preaggr_dd_url can be set to the same value as dd_url
# and if there are safeguards in the configuration setup
# Search for preaggr_dd_url configuration and validation
rg -A 10 -B 5 "preaggr_dd_url.*=|GetString.*preaggr_dd_url"
# Check for validation preventing duplicate URLs
ast-grep --pattern $'if $_ == $_ {
$$$
}'Length of output: 66119
Remove duplicate‐sending check (enforced in config utils) and cache URLs in DefaultForwarder
The pkg/config/utils/endpoints.go logic already skips adding a preaggregated endpoint when preaggr_dd_url == dd_url, so the “duplicate send” scenario cannot occur. However, repeatedly calling f.config.GetString(...) on every transaction impacts performance. Cache both URLs during forwarder initialization and simplify the PreaggrOnly block:
• File: comp/forwarder/defaultforwarder/default_forwarder.go
Lines: ~505–522
Suggested refactor:
type DefaultForwarder struct {
- config config.Config
+ config config.Config
+ preaggURL string
+ ddURL string
// …
}
func NewDefaultForwarder(cfg config.Config, …) *DefaultForwarder {
- return &DefaultForwarder{config: cfg, …}
+ return &DefaultForwarder{
+ config: cfg,
+ preaggURL: cfg.GetString("preaggr_dd_url"),
+ ddURL: cfg.GetString("dd_url"),
+ // …
+ }
}
// inside transaction‐creation loop
if payload.Destination == transaction.PreaggrOnly {
- if drDomain != f.config.GetString("preaggr_dd_url") {
+ if f.preaggURL == "" {
+ f.log.Warn("PreaggrOnly payload skipped: preaggr_dd_url not set")
+ continue
+ }
+ if drDomain != f.preaggURL {
continue
}
endpoint = endpoints.PreaggrSeriesEndpoint
}- Removed the redundant “duplicate send” warning (config utils enforces uniqueness).
- Cached both URLs on startup to eliminate repetitive
GetStringcalls.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // If the payload has a preaggr destination, but the current domain | |
| // is not the configured preaggr site, do not create a transaction. | |
| // | |
| // If we have a preaggr payload and domain, switch from the standard | |
| // series endpoint to the preaggr-specific series endpoint. | |
| if payload.Destination == transaction.PreaggrOnly { | |
| if drDomain != f.config.GetString("preaggr_dd_url") { | |
| continue | |
| } else { | |
| endpoint = endpoints.PreaggrSeriesEndpoint | |
| } | |
| } | |
| // TODO(?): If the preaggr_dd_url is the same as the primary dd_url, | |
| // we will also inherit any additional API keys from the | |
| // configuration of that site, meaning we'll send preaggr payloads | |
| // for each of those orgs. Not sure if this is a problem or not. | |
| // --- in comp/forwarder/defaultforwarder/default_forwarder.go --- | |
| // 1) Cache URLs in the struct | |
| type DefaultForwarder struct { | |
| config config.Config | |
| preaggURL string | |
| ddURL string | |
| // … | |
| } | |
| func NewDefaultForwarder(cfg config.Config, …) *DefaultForwarder { | |
| return &DefaultForwarder{ | |
| config: cfg, | |
| preaggURL: cfg.GetString("preaggr_dd_url"), | |
| ddURL: cfg.GetString("dd_url"), | |
| // … | |
| } | |
| } | |
| // … later, inside the transaction-creation loop … | |
| // If the payload has a preaggr destination, but the current domain | |
| // is not the configured preaggr site, do not create a transaction. | |
| // | |
| // If we have a preaggr payload and domain, switch from the standard | |
| // series endpoint to the preaggr-specific series endpoint. | |
| if payload.Destination == transaction.PreaggrOnly { | |
| if f.preaggURL == "" { | |
| f.log.Warn("PreaggrOnly payload skipped: preaggr_dd_url not set") | |
| continue | |
| } | |
| if drDomain != f.preaggURL { | |
| continue | |
| } | |
| endpoint = endpoints.PreaggrSeriesEndpoint | |
| } | |
| // TODO(?): If the preaggr_dd_url is the same as the primary dd_url, | |
| // we will also inherit any additional API keys from the | |
| // configuration of that site… |
🤖 Prompt for AI Agents
In comp/forwarder/defaultforwarder/default_forwarder.go around lines 505 to 522,
remove the redundant duplicate-sending check for preaggr payloads since this is
already enforced in pkg/config/utils/endpoints.go. Instead of calling
f.config.GetString("preaggr_dd_url") on every transaction, cache both the
primary dd_url and preaggr_dd_url during DefaultForwarder initialization. Then
update the PreaggrOnly destination handling to use the cached URLs for domain
comparison, simplifying the logic and improving performance.
| failoverActiveForMRF, allowlistForMRF := s.getFailoverAllowlist() | ||
| failoverActiveForAutoscaling, allowlistForAutoscaling := s.getAutoscalingFailoverMetrics() | ||
| failoverActive := (failoverActiveForMRF && len(allowlistForMRF) > 0) || (failoverActiveForAutoscaling && len(allowlistForAutoscaling) > 0) | ||
| pipelines := make([]metricsserializer.Pipeline, 0, 4) | ||
| if failoverActive { | ||
| // Default behavior, primary region only | ||
| pipelines = append(pipelines, metricsserializer.Pipeline{ | ||
| FilterFunc: func(series *metrics.Serie) bool { return true }, | ||
| Destination: transaction.PrimaryOnly, | ||
| }) | ||
|
|
||
| // Filter for MRF | ||
| pipelines = append(pipelines, metricsserializer.Pipeline{ | ||
| FilterFunc: func(s *metrics.Serie) bool { | ||
| _, allowed := allowlistForMRF[s.Name] | ||
| return allowed | ||
| }, | ||
| Destination: transaction.SecondaryOnly, | ||
| }) | ||
|
|
||
| // Filter for Autoscaling | ||
| pipelines = append(pipelines, metricsserializer.Pipeline{ | ||
| FilterFunc: func(s *metrics.Serie) bool { | ||
| _, allowed := allowlistForAutoscaling[s.Name] | ||
| return allowed | ||
| }, | ||
| Destination: transaction.LocalOnly, | ||
| }) | ||
| } else { | ||
| // Default behavior, all regions | ||
| pipelines = append(pipelines, metricsserializer.Pipeline{ | ||
| FilterFunc: func(series *metrics.Serie) bool { return true }, | ||
| Destination: transaction.AllRegions, | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Verify failover pipeline logic correctness.
The pipeline-based approach is a significant improvement in terms of modularity and readability. However, I want to verify the failover logic is equivalent to the previous implementation.
The logic appears correct - it creates separate pipelines for primary (all series), secondary (MRF allowlist), and local (autoscaling allowlist) destinations when failover is active, otherwise sends all series to all regions.
🏁 Script executed:
#!/bin/bash
# Search for the original failover logic to compare with the new pipeline approach
rg -A 20 -B 5 "multi_region_failover.*enabled.*failover_metrics"
# Check for any existing tests that validate failover behavior
fd -e go -t f | xargs grep -l "failover.*test\|test.*failover" | head -5Length of output: 5780
Verified failover pipeline logic matches original behavior
I’ve confirmed that the new pipeline-based code in pkg/serializer/serializer.go (around lines 362–396) uses the exact same flag checks and allowlists from getFailoverAllowlist() and getAutoscalingFailoverMetrics() as the prior implementation:
- When either MRF or Autoscaling failover is active (and their respective allowlists are non-empty), it emits three pipelines:
• PrimaryOnly (all series)
• SecondaryOnly (MRF allowlist)
• LocalOnly (Autoscaling allowlist) - Otherwise it emits a single AllRegions pipeline.
I did not find any existing tests that exercise these failover‐metrics pipelines. To fully validate this important behavior, please add unit tests covering:
• MRF-only failover routing
• Autoscaling-only failover routing
• Both failovers active
• No-failover (AllRegions) path
— especially around pkg/serializer/serializer.go lines 362–396.
🤖 Prompt for AI Agents
In pkg/serializer/serializer.go around lines 362 to 396, the failover pipeline
logic is correctly implemented but lacks unit test coverage. Add unit tests that
specifically cover the failover routing scenarios: MRF-only failover active,
Autoscaling-only failover active, both failovers active simultaneously, and the
no-failover case where all regions receive metrics. These tests should verify
that the correct pipelines with appropriate filters and destinations are created
for each scenario to ensure the failover behavior is fully validated.
| // We are modifying the series in this filter, so it must always be the last | ||
| // pipeline in the list to avoid affecting other pipelines. | ||
| if s.config.GetBool("enable_preaggr_pipeline") { | ||
| pipelines = append(pipelines, metricsserializer.Pipeline{ | ||
| FilterFunc: func(s *metrics.Serie) bool { | ||
| // TODO: don't add host tags if they were already added because of `expected_tags_duration` being set | ||
| hostTags := slices.Clone(hostMetadataUtils.Get(context.TODO(), false, pkgconfigsetup.Datadog()).System) | ||
| s.Tags = tagset.CombineCompositeTagsAndSlice(s.Tags, hostTags) | ||
| return true | ||
| }, | ||
| Destination: transaction.PreaggrOnly, | ||
| }) | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Address host tag modification concerns in preaggregation pipeline.
The preaggregation pipeline implementation has several concerns that need attention:
-
Side effect warning: The comment on lines 398-399 correctly notes this pipeline must be last since it modifies series data. This is a potential source of bugs if the pipeline order changes.
-
TODO concern: Line 403 mentions not adding host tags if they were already added due to
expected_tags_duration. This could lead to duplicate tags or inconsistent behavior. -
Context usage: Using
context.TODO()on line 404 instead of a proper context might miss cancellation or timeout scenarios. -
Performance impact:
slices.Clone()andtagset.CombineCompositeTagsAndSlice()are called for every series, which could be expensive for high-volume metrics.
Consider improving the implementation:
// We are modifying the series in this filter, so it must always be the last
// pipeline in the list to avoid affecting other pipelines.
if s.config.GetBool("enable_preaggr_pipeline") {
+ // Cache host tags to avoid repeated retrieval
+ hostTags := slices.Clone(hostMetadataUtils.Get(context.TODO(), false, pkgconfigsetup.Datadog()).System)
+
pipelines = append(pipelines, metricsserializer.Pipeline{
FilterFunc: func(s *metrics.Serie) bool {
- // TODO: don't add host tags if they were already added because of `expected_tags_duration` being set
- hostTags := slices.Clone(hostMetadataUtils.Get(context.TODO(), false, pkgconfigsetup.Datadog()).System)
+ // TODO: Check if host tags already present to avoid duplicates
s.Tags = tagset.CombineCompositeTagsAndSlice(s.Tags, hostTags)
return true
},
Destination: transaction.PreaggrOnly,
})
}Let's check how expected_tags_duration interacts with host tag addition:
🏁 Script executed:
#!/bin/bash
# Search for expected_tags_duration usage and host tag handling
rg -A 10 -B 5 "expected_tags_duration"
# Check if there are existing mechanisms to detect duplicate host tags
ast-grep --pattern $'func $_(tags $_) $_ {
$$$
}'Length of output: 50317
Refactor preaggregation pipeline host-tag logic
The current implementation always fetches and clones host tags per-series (inside the FilterFunc), unconditionally adds them (risking duplicates when the standard host-tag provider is active), and uses context.TODO(). Let’s tighten this up:
• Cache & clone host tags once, outside the per-series closure, so you don’t call hostMetadataUtils.Get() on every metric.
• Replace context.TODO() with a real context (e.g. a field on your serializer or a passed-in ctx).
• Detect & skip already-present host tags to honor expected_tags_duration and avoid dupes.
• Ensure this pipeline remains last (to avoid unintended side effects).
Suggested diff in pkg/serializer/serializer.go:
if s.config.GetBool("enable_preaggr_pipeline") {
- pipelines = append(pipelines, metricsserializer.Pipeline{
+ // Preaggregated metrics need host tags once—cache at pipeline setup
+ ctx := s.ctx // or however you carry a real Context
+ hostTags := slices.Clone(hostMetadataUtils.Get(ctx, false, pkgconfigsetup.Datadog()).System)
+
+ pipelines = append(pipelines, metricsserializer.Pipeline{
FilterFunc: func(s *metrics.Serie) bool {
- // TODO: don't add host tags if they were already added because of `expected_tags_duration` being set
- hostTags := slices.Clone(hostMetadataUtils.Get(context.TODO(), false, pkgconfigsetup.Datadog()).System)
+ // skip if host tags already on the series
+ if containsAll(s.Tags, hostTags) {
+ return true
+ }
+ s.Tags = tagset.CombineCompositeTagsAndSlice(s.Tags, hostTags)
return true
},
Destination: transaction.PreaggrOnly,
})
}You’ll need to implement a small helper (e.g. containsAll([]string, []string) bool) to detect duplicates. This refactor eliminates per-series overhead, avoids duplicate tag addition, and uses a real context.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In pkg/serializer/serializer.go around lines 398 to 411, the preaggregation
pipeline currently fetches and clones host tags inside the per-series FilterFunc
using context.TODO(), which causes performance overhead and risks adding
duplicate host tags if they were already added due to expected_tags_duration. To
fix this, move the host tag fetching and cloning outside the FilterFunc so it
happens once, replace context.TODO() with a proper context passed into the
serializer or function, implement a helper function to check if all host tags
are already present in the series tags to avoid duplicates, and only add host
tags if they are missing. Also, keep the pipeline as the last in the list to
prevent side effects.
| payloads := make([]*transaction.BytesPayload, len(pbs)) | ||
| for _, pb := range pbs { | ||
| payloads = append(payloads, pb.payloads...) | ||
| } | ||
|
|
||
| return payloads, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix payload collection logic to avoid slice reallocation.
The payload collection has an efficiency issue:
The current logic initializes payloads with a fixed size but then uses append(), which will cause reallocation and potentially incorrect indexing:
-payloads := make([]*transaction.BytesPayload, len(pbs))
+var payloads []*transaction.BytesPayload
for _, pb := range pbs {
payloads = append(payloads, pb.payloads...)
}Alternatively, pre-calculate the total capacity:
+totalCapacity := 0
+for _, pb := range pbs {
+ totalCapacity += len(pb.payloads)
+}
+payloads := make([]*transaction.BytesPayload, 0, totalCapacity)
for _, pb := range pbs {
payloads = append(payloads, pb.payloads...)
}🤖 Prompt for AI Agents
In pkg/serializer/internal/metrics/iterable_series.go around lines 163 to 168,
the code initializes the payloads slice with a fixed length but then uses
append, causing unnecessary reallocations and incorrect slice length. To fix
this, first calculate the total number of payloads across all pbs, then create
the payloads slice with that exact capacity and length, and finally copy the
payloads directly into the slice without using append.
What does this PR do?
Motivation
Describe how you validated your changes
Possible Drawbacks / Trade-offs
Additional Notes
Summary by CodeRabbit
New Features
Refactor
Chores