diff --git a/lib/flagsmith.rb b/lib/flagsmith.rb index c8dea68..5bcef1e 100644 --- a/lib/flagsmith.rb +++ b/lib/flagsmith.rb @@ -16,6 +16,7 @@ require 'flagsmith/sdk/models/flags' require 'flagsmith/sdk/models/segments' require 'flagsmith/sdk/offline_handlers' +require 'flagsmith/sdk/realtime_client' require 'flagsmith/engine/core' @@ -46,6 +47,7 @@ class Client # rubocop:disable Metrics/ClassLength # :environment_key, :api_url, :custom_headers, :request_timeout_seconds, :enable_local_evaluation, # :environment_refresh_interval_seconds, :retries, :enable_analytics, :default_flag_handler, # :offline_mode, :offline_handler, :polling_manager_failure_limit + # :realtime_api_url, :enable_realtime_updates, :logger # # You can see full description in the Flagsmith::Config @@ -59,6 +61,7 @@ def initialize(config) @identity_overrides_by_identifier = {} validate_offline_mode! + validate_realtime_mode! api_client analytics_processor @@ -78,10 +81,21 @@ def validate_offline_mode! 'Cannot use offline_handler and default_flag_handler at the same time.' end + def validate_realtime_mode! + return unless @config.realtime_mode? && !@config.local_evaluation? + + raise Flagsmith::ClientError, + 'The enable_realtime_updates config param requires a matching enable_local_evaluation param.' + end + def api_client @api_client ||= Flagsmith::ApiClient.new(@config) end + def realtime_client + @realtime_client ||= Flagsmith::RealtimeClient.new(@config) + end + def engine @engine ||= Flagsmith::Engine::Engine.new end @@ -104,6 +118,14 @@ def load_offline_handler def environment_data_polling_manager return nil unless @config.local_evaluation? + # Bypass the environment data polling manager if realtime + # is present in the configuration. + if @config.realtime_mode? + update_environment + realtime_client.listen self unless realtime_client.running + return + end + update_environment if @environment_data_polling_manager.nil? @environment_data_polling_manager ||= Flagsmith::EnvironmentDataPollingManager.new( diff --git a/lib/flagsmith/sdk/config.rb b/lib/flagsmith/sdk/config.rb index 049a537..172201c 100644 --- a/lib/flagsmith/sdk/config.rb +++ b/lib/flagsmith/sdk/config.rb @@ -4,10 +4,13 @@ module Flagsmith # Config options shared around Engine class Config DEFAULT_API_URL = 'https://edge.api.flagsmith.com/api/v1/' + DEFAULT_REALTIME_API_URL = 'https://realtime.flagsmith.com/' + OPTIONS = %i[ environment_key api_url custom_headers request_timeout_seconds enable_local_evaluation environment_refresh_interval_seconds retries enable_analytics default_flag_handler - offline_mode offline_handler polling_manager_failure_limit logger + offline_mode offline_handler polling_manager_failure_limit + realtime_api_url enable_realtime_updates logger ].freeze # Available Configs @@ -40,6 +43,9 @@ class Config # the entire environment, project, flags, etc. # +polling_manager_failure_limit+ - An integer to control how long to suppress errors in # the polling manager for local evaluation mode. + # +realtime_api_url+ - Override the realtime api URL to communicate with a + # non-standard realtime endpoint. + # +enable_realtime_updates+ - A boolean to enable realtime updates. # +logger+ - Pass your logger, default is Logger.new($stdout) # attr_reader(*OPTIONS) @@ -62,6 +68,10 @@ def offline_mode? @offline_mode end + def realtime_mode? + @enable_realtime_updates + end + def environment_flags_url 'flags/' end @@ -92,6 +102,9 @@ def build_config(options) @offline_mode = opts.fetch(:offline_mode, false) @offline_handler = opts[:offline_handler] @polling_manager_failure_limit = opts.fetch(:polling_manager_failure_limit, 10) + @realtime_api_url = opts.fetch(:realtime_api_url, Flagsmith::Config::DEFAULT_REALTIME_API_URL) + @realtime_api_url << '/' unless @realtime_api_url.end_with? '/' + @enable_realtime_updates = opts.fetch(:enable_realtime_updates, false) @logger = options.fetch(:logger, Logger.new($stdout).tap { |l| l.level = :debug }) end # rubocop:enable Metrics/AbcSize, Metrics/MethodLength diff --git a/lib/flagsmith/sdk/realtime_client.rb b/lib/flagsmith/sdk/realtime_client.rb new file mode 100644 index 0000000..58bd717 --- /dev/null +++ b/lib/flagsmith/sdk/realtime_client.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +require 'logger' +require 'faraday' +require 'json' + +module Flagsmith + # Ruby client for realtime access to flagsmith.com + class RealtimeClient + attr_accessor :running + + def initialize(config) + @config = config + @thread = nil + @running = false + @main = nil + end + + def endpoint + "#{@config.realtime_api_url}sse/environments/#{@main.environment.api_key}/stream" + end + + def listen(main, remaining_attempts: Float::INFINITY, retry_interval: 0.5) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/AbcSize, Metrics/MethodLength + last_updated_at = 0 + @main = main + @running = true + @thread = Thread.new do + while @running && remaining_attempts.positive? + remaining_attempts -= 1 + @config.logger.warn 'Beginning to pull down realtime endpoint' + begin + sleep retry_interval + # Open connection to SSE endpoint + Faraday.new(url: endpoint).get do |req| + req.options.timeout = nil # Keep connection alive indefinitely + req.options.open_timeout = 10 + end.body.each_line do |line| # rubocop:disable Style/MultilineBlockChain + # SSE protocol: Skip non-event lines + next if line.strip.empty? || line.start_with?(':') + + # Parse SSE fields + next unless line.start_with?('data: ') + + data = JSON.parse(line[6..].strip) + updated_at = data['updated_at'] + next unless updated_at > last_updated_at + + @config.logger.info "Realtime updating environment from #{last_updated_at} to #{updated_at}" + @main.update_environment + last_updated_at = updated_at + end + rescue Faraday::ConnectionFailed, Faraday::TimeoutError => e + @config.logger.warn "Connection failed: #{e.message}. Retrying in #{retry_interval} seconds..." + rescue StandardError => e + @config.logger.error "Error: #{e.message}. Retrying in #{retry_interval} seconds..." + end + end + end + + @running = false + end + end +end diff --git a/spec/sdk/realtime_client_spec.rb b/spec/sdk/realtime_client_spec.rb new file mode 100644 index 0000000..162deed --- /dev/null +++ b/spec/sdk/realtime_client_spec.rb @@ -0,0 +1,103 @@ +require 'spec_helper' +require 'faraday' + +RSpec.describe Flagsmith::RealtimeClient do + let(:mock_logger) { double('Logger', warn: nil, info: nil, error: nil) } + let(:mock_config) do + double('Config', + realtime_api_url: 'https://example.com/', + environment_key: 'test-environment', + logger: mock_logger) + end + let(:mock_environment) { double('Environment', + api_key: 'some_api_key' )} + let(:mock_main) { double('Main', + update_environment: nil, + environment: mock_environment, + ) } + let(:realtime_client) { described_class.new(mock_config) } + let(:sse_response) do + <<~SSE + data: {"updated_at": 1} + + data: {"updated_at": 2} + SSE + end + let(:retry_interval) { 0.01 } + + before(:each) do + allow(Faraday).to receive(:new).and_return(double('Faraday::Connection', get: double('Response', body: sse_response))) + allow(Thread).to receive(:new).and_yield + end + + describe '#listen' do + after { realtime_client.running = false } + + it 'parses SSE data and calls update_environment when updated_at increases' do + expect(mock_main).to receive(:update_environment).twice + realtime_client.listen(mock_main, retry_interval: retry_interval, remaining_attempts: 3) + end + + it 'logs retries and continues on connection failure' do + allow(Faraday).to receive(:new).and_raise(Faraday::ConnectionFailed.new('Connection failed')) + + expect(mock_logger).to receive(:warn).with(/Connection failed/).at_least(:once) + realtime_client.listen(mock_main, retry_interval: retry_interval, remaining_attempts: 3) + end + + it 'handles and logs unexpected errors gracefully' do + allow(Faraday).to receive(:new).and_raise(StandardError.new('Unexpected error')) + + expect(mock_logger).to receive(:error).with(/Unexpected error/).at_least(:once) + realtime_client.listen(mock_main, retry_interval: retry_interval, remaining_attempts: 3) + end + + end +end + +RSpec.describe Flagsmith::Client do + describe '#initialize' do + before do + # Mock the methods to avoid initialization interferring. + allow_any_instance_of(Flagsmith::Client).to receive(:api_client) + allow_any_instance_of(Flagsmith::Client).to receive(:analytics_processor) + allow_any_instance_of(Flagsmith::Client).to receive(:environment_data_polling_manager) + allow_any_instance_of(Flagsmith::Client).to receive(:engine) + allow_any_instance_of(Flagsmith::Client).to receive(:load_offline_handler) + end + + context 'when realtime_mode is true and local_evaluation is false' do + it 'raises a Flagsmith::ClientError' do + config = double( + 'Config', + realtime_mode?: true, + local_evaluation?: false, + offline_mode?: false, + offline_handler: nil, + ) + allow(Flagsmith::Config).to receive(:new).and_return(config) + + expect { + Flagsmith::Client.new(config) + }.to raise_error(Flagsmith::ClientError, 'The enable_realtime_updates config param requires a matching enable_local_evaluation param.') + end + end + + context 'when realtime_mode is false or local_evaluation is true' do + it 'does not raise an exception' do + config = double( + 'Config', + realtime_mode?: false, + local_evaluation?: true, + offline_mode?: false, + offline_handler: nil, + ) + allow(Flagsmith::Config).to receive(:new).and_return(config) + + expect { + Flagsmith::Client.new(config) + }.not_to raise_error + end + end + end +end