diff --git a/lib/splitclient-rb/sse/event_source/client.rb b/lib/splitclient-rb/sse/event_source/client.rb index 022894d6..b58406fb 100644 --- a/lib/splitclient-rb/sse/event_source/client.rb +++ b/lib/splitclient-rb/sse/event_source/client.rb @@ -1,7 +1,9 @@ # frozen_string_literal: false -require 'socketry' +require 'socket' +require 'openssl' require 'uri' +require 'timeout' module SplitIoClient module SSE @@ -36,12 +38,12 @@ def initialize(config, def close(status = nil) unless connected? - @config.logger.error('SSEClient already disconected.') if @config.debug_enabled + log_if_debug('SSEClient already disconected.', 3) return end @connected.make_false - @socket&.close + @socket.close push_status(status) rescue StandardError => e @config.logger.error("SSEClient close Error: #{e.inspect}") @@ -74,30 +76,40 @@ def connected? def connect_thread(latch) @config.threads[:connect_stream] = Thread.new do - @config.logger.info('Starting connect_stream thread ...') if @config.debug_enabled + log_if_debug('Starting connect_stream thread ...', 2) new_status = connect_stream(latch) push_status(new_status) - @config.logger.info('connect_stream thread finished.') if @config.debug_enabled + log_if_debug('connect_stream thread finished.', 2) end end def connect_stream(latch) return Constants::PUSH_NONRETRYABLE_ERROR unless socket_write(latch) - while connected? || @first_event.value begin - partial_data = @socket.readpartial(10_000, timeout: @read_timeout) - + partial_data = "" + Timeout::timeout @read_timeout do + partial_data = @socket.readpartial(10_000) + end read_first_event(partial_data, latch) raise 'eof exception' if partial_data == :eof + rescue Timeout::Error => e + log_if_debug("SSE read operation timed out!: #{e.inspect}", 3) + return Constants::PUSH_RETRYABLE_ERROR + rescue EOFError + raise 'eof exception' + rescue Errno::EAGAIN => e + log_if_debug("SSE client transient error: #{e.inspect}", 1) + IO.select([tcp_socket]) + retry rescue Errno::EBADF, IOError => e - @config.logger.error(e.inspect) if @config.debug_enabled + log_if_debug(e.inspect, 3) return nil rescue StandardError => e return nil if ENV['SPLITCLIENT_ENV'] == 'test' - @config.logger.error("Error reading partial data: #{e.inspect}") if @config.debug_enabled + log_if_debug("Error reading partial data: #{e.inspect}", 3) return Constants::PUSH_RETRYABLE_ERROR end @@ -109,10 +121,10 @@ def connect_stream(latch) def socket_write(latch) @first_event.make_true @socket = socket_connect - @socket.write(build_request(@uri)) + @socket.puts(build_request(@uri)) true rescue StandardError => e - @config.logger.error("Error during connecting to #{@uri.host}. Error: #{e.inspect}") + log_if_debug("Error during connecting to #{@uri.host}. Error: #{e.inspect}", 3) latch.count_down false end @@ -138,15 +150,28 @@ def read_first_event(data, latch) end def socket_connect - return Socketry::SSL::Socket.connect(@uri.host, @uri.port) if @uri.scheme.casecmp('https').zero? + tcp_socket = TCPSocket.new(@uri.host, @uri.port) + if @uri.scheme.casecmp('https').zero? + begin + ssl_context = OpenSSL::SSL::SSLContext.new + ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) + ssl_socket.hostname = @uri.host + ssl_socket.connect + return ssl_socket.connect + rescue Exception => e + @config.logger.error("socket connect error: #{e.inspect}") + puts e.inspect + return nil + end + end - Socketry::TCP::Socket.connect(@uri.host, @uri.port) + tcp_socket end def process_data(partial_data) return if partial_data.nil? || partial_data == KEEP_ALIVE_RESPONSE - @config.logger.debug("Event partial data: #{partial_data}") if @config.debug_enabled + log_if_debug("Event partial data: #{partial_data}", 1) events = @event_parser.parse(partial_data) events.each { |event| process_event(event) } rescue StandardError => e @@ -162,7 +187,7 @@ def build_request(uri) req << "SplitSDKMachineName: #{@config.machine_name}\r\n" req << "SplitSDKClientKey: #{@api_key.split(//).last(4).join}\r\n" unless @api_key.nil? req << "Cache-Control: no-cache\r\n\r\n" - @config.logger.debug("Request info: #{req}") if @config.debug_enabled + log_if_debug("Request info: #{req}", 1) req end @@ -200,6 +225,19 @@ def push_status(status) @config.logger.debug("Pushing new sse status: #{status}") @status_queue.push(status) end + + def log_if_debug(text, level) + if @config.debug_enabled + case level + when 1 + @config.logger.debug(text) + when 2 + @config.logger.info(text) + else + @config.logger.error(text) + end + end + end end end end diff --git a/lib/splitclient-rb/version.rb b/lib/splitclient-rb/version.rb index 1afbdd0a..e7470b9c 100644 --- a/lib/splitclient-rb/version.rb +++ b/lib/splitclient-rb/version.rb @@ -1,3 +1,3 @@ module SplitIoClient - VERSION = '8.9.0' + VERSION = '8.10.0-rc1' end diff --git a/splitclient-rb.gemspec b/splitclient-rb.gemspec index 3c818df9..742c6d3f 100644 --- a/splitclient-rb.gemspec +++ b/splitclient-rb.gemspec @@ -59,6 +59,5 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency 'lru_redux', '~> 1.1' spec.add_runtime_dependency 'net-http-persistent', '>= 2.9', '< 5.0' spec.add_runtime_dependency 'redis', '>= 4.0.0', '< 6.0' - spec.add_runtime_dependency 'socketry', '>= 0.4', '< 1.0' spec.add_runtime_dependency 'thread_safe', '~> 0.3' end