Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions lib/flagsmith.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
require 'flagsmith/sdk/models/flags'
require 'flagsmith/sdk/models/segments'
require 'flagsmith/sdk/offline_handlers'
require 'flagsmith/sdk/realtime_client'

require 'flagsmith/engine/core'

Expand Down Expand Up @@ -46,6 +47,7 @@ class Client # rubocop:disable Metrics/ClassLength
# :environment_key, :api_url, :custom_headers, :request_timeout_seconds, :enable_local_evaluation,
# :environment_refresh_interval_seconds, :retries, :enable_analytics, :default_flag_handler,
# :offline_mode, :offline_handler, :polling_manager_failure_limit
# :realtime_api_url, :enable_realtime_updates, :logger
#
# You can see full description in the Flagsmith::Config

Expand All @@ -59,6 +61,7 @@ def initialize(config)
@identity_overrides_by_identifier = {}

validate_offline_mode!
validate_realtime_mode!

api_client
analytics_processor
Expand All @@ -78,10 +81,21 @@ def validate_offline_mode!
'Cannot use offline_handler and default_flag_handler at the same time.'
end

def validate_realtime_mode!
return unless @config.realtime_mode? && !@config.local_evaluation?

raise Flagsmith::ClientError,
'The enable_realtime_updates config param requires a matching enable_local_evaluation param.'
end

def api_client
@api_client ||= Flagsmith::ApiClient.new(@config)
end

def realtime_client
@realtime_client ||= Flagsmith::RealtimeClient.new(@config)
end

def engine
@engine ||= Flagsmith::Engine::Engine.new
end
Expand All @@ -104,6 +118,14 @@ def load_offline_handler
def environment_data_polling_manager
return nil unless @config.local_evaluation?

# Bypass the environment data polling manager if realtime
# is present in the configuration.
if @config.realtime_mode?
update_environment
realtime_client.listen self unless realtime_client.running
return
end

update_environment if @environment_data_polling_manager.nil?

@environment_data_polling_manager ||= Flagsmith::EnvironmentDataPollingManager.new(
Expand Down
15 changes: 14 additions & 1 deletion lib/flagsmith/sdk/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ module Flagsmith
# Config options shared around Engine
class Config
DEFAULT_API_URL = 'https://edge.api.flagsmith.com/api/v1/'
DEFAULT_REALTIME_API_URL = 'https://realtime.flagsmith.com/'

OPTIONS = %i[
environment_key api_url custom_headers request_timeout_seconds enable_local_evaluation
environment_refresh_interval_seconds retries enable_analytics default_flag_handler
offline_mode offline_handler polling_manager_failure_limit logger
offline_mode offline_handler polling_manager_failure_limit
realtime_api_url enable_realtime_updates logger
].freeze

# Available Configs
Expand Down Expand Up @@ -40,6 +43,9 @@ class Config
# the entire environment, project, flags, etc.
# +polling_manager_failure_limit+ - An integer to control how long to suppress errors in
# the polling manager for local evaluation mode.
# +realtime_api_url+ - Override the realtime api URL to communicate with a
# non-standard realtime endpoint.
# +enable_realtime_updates+ - A boolean to enable realtime updates.
# +logger+ - Pass your logger, default is Logger.new($stdout)
#
attr_reader(*OPTIONS)
Expand All @@ -62,6 +68,10 @@ def offline_mode?
@offline_mode
end

def realtime_mode?
@enable_realtime_updates
end

def environment_flags_url
'flags/'
end
Expand Down Expand Up @@ -92,6 +102,9 @@ def build_config(options)
@offline_mode = opts.fetch(:offline_mode, false)
@offline_handler = opts[:offline_handler]
@polling_manager_failure_limit = opts.fetch(:polling_manager_failure_limit, 10)
@realtime_api_url = opts.fetch(:realtime_api_url, Flagsmith::Config::DEFAULT_REALTIME_API_URL)
@realtime_api_url << '/' unless @realtime_api_url.end_with? '/'
@enable_realtime_updates = opts.fetch(:enable_realtime_updates, false)
@logger = options.fetch(:logger, Logger.new($stdout).tap { |l| l.level = :debug })
end
# rubocop:enable Metrics/AbcSize, Metrics/MethodLength
Expand Down
63 changes: 63 additions & 0 deletions lib/flagsmith/sdk/realtime_client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# frozen_string_literal: true

require 'logger'
require 'faraday'
require 'json'

module Flagsmith
# Ruby client for realtime access to flagsmith.com
class RealtimeClient
attr_accessor :running

def initialize(config)
@config = config
@thread = nil
@running = false
@main = nil
end

def endpoint
"#{@config.realtime_api_url}sse/environments/#{@main.environment.api_key}/stream"
end

def listen(main, remaining_attempts: Float::INFINITY, retry_interval: 0.5) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/AbcSize, Metrics/MethodLength
last_updated_at = 0
@main = main
@running = true
@thread = Thread.new do
while @running && remaining_attempts.positive?
remaining_attempts -= 1
@config.logger.warn 'Beginning to pull down realtime endpoint'
begin
sleep retry_interval
# Open connection to SSE endpoint
Faraday.new(url: endpoint).get do |req|
req.options.timeout = nil # Keep connection alive indefinitely
req.options.open_timeout = 10
end.body.each_line do |line| # rubocop:disable Style/MultilineBlockChain
# SSE protocol: Skip non-event lines
next if line.strip.empty? || line.start_with?(':')

# Parse SSE fields
next unless line.start_with?('data: ')

data = JSON.parse(line[6..].strip)
updated_at = data['updated_at']
next unless updated_at > last_updated_at

@config.logger.info "Realtime updating environment from #{last_updated_at} to #{updated_at}"
@main.update_environment
last_updated_at = updated_at
end
rescue Faraday::ConnectionFailed, Faraday::TimeoutError => e
@config.logger.warn "Connection failed: #{e.message}. Retrying in #{retry_interval} seconds..."
rescue StandardError => e
@config.logger.error "Error: #{e.message}. Retrying in #{retry_interval} seconds..."
end
Comment on lines +52 to +56
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't quite understand the reasoning behind different logging levels for different errors here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why we warn for a connection failed error is that it is expected behaviour. The SSE connection is, by design, going to fail after a poll of 30 seconds.

end
end

@running = false
end
end
end
103 changes: 103 additions & 0 deletions spec/sdk/realtime_client_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
require 'spec_helper'
require 'faraday'

RSpec.describe Flagsmith::RealtimeClient do
let(:mock_logger) { double('Logger', warn: nil, info: nil, error: nil) }
let(:mock_config) do
double('Config',
realtime_api_url: 'https://example.com/',
environment_key: 'test-environment',
logger: mock_logger)
end
let(:mock_environment) { double('Environment',
api_key: 'some_api_key' )}
let(:mock_main) { double('Main',
update_environment: nil,
environment: mock_environment,
) }
let(:realtime_client) { described_class.new(mock_config) }
let(:sse_response) do
<<~SSE
data: {"updated_at": 1}

data: {"updated_at": 2}
SSE
end
let(:retry_interval) { 0.01 }

before(:each) do
allow(Faraday).to receive(:new).and_return(double('Faraday::Connection', get: double('Response', body: sse_response)))
allow(Thread).to receive(:new).and_yield
end

describe '#listen' do
after { realtime_client.running = false }

it 'parses SSE data and calls update_environment when updated_at increases' do
expect(mock_main).to receive(:update_environment).twice
realtime_client.listen(mock_main, retry_interval: retry_interval, remaining_attempts: 3)
end

it 'logs retries and continues on connection failure' do
allow(Faraday).to receive(:new).and_raise(Faraday::ConnectionFailed.new('Connection failed'))

expect(mock_logger).to receive(:warn).with(/Connection failed/).at_least(:once)
realtime_client.listen(mock_main, retry_interval: retry_interval, remaining_attempts: 3)
end

it 'handles and logs unexpected errors gracefully' do
allow(Faraday).to receive(:new).and_raise(StandardError.new('Unexpected error'))

expect(mock_logger).to receive(:error).with(/Unexpected error/).at_least(:once)
realtime_client.listen(mock_main, retry_interval: retry_interval, remaining_attempts: 3)
end

end
end

RSpec.describe Flagsmith::Client do
describe '#initialize' do
before do
# Mock the methods to avoid initialization interferring.
allow_any_instance_of(Flagsmith::Client).to receive(:api_client)
allow_any_instance_of(Flagsmith::Client).to receive(:analytics_processor)
allow_any_instance_of(Flagsmith::Client).to receive(:environment_data_polling_manager)
allow_any_instance_of(Flagsmith::Client).to receive(:engine)
allow_any_instance_of(Flagsmith::Client).to receive(:load_offline_handler)
end

context 'when realtime_mode is true and local_evaluation is false' do
it 'raises a Flagsmith::ClientError' do
config = double(
'Config',
realtime_mode?: true,
local_evaluation?: false,
offline_mode?: false,
offline_handler: nil,
)
allow(Flagsmith::Config).to receive(:new).and_return(config)

expect {
Flagsmith::Client.new(config)
}.to raise_error(Flagsmith::ClientError, 'The enable_realtime_updates config param requires a matching enable_local_evaluation param.')
end
end

context 'when realtime_mode is false or local_evaluation is true' do
it 'does not raise an exception' do
config = double(
'Config',
realtime_mode?: false,
local_evaluation?: true,
offline_mode?: false,
offline_handler: nil,
)
allow(Flagsmith::Config).to receive(:new).and_return(config)

expect {
Flagsmith::Client.new(config)
}.not_to raise_error
end
end
end
end