diff --git a/.travis.yml b/.travis.yml index 0949dc6a..c6d565ff 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,17 +2,9 @@ sudo: false language: ruby rvm: - - 2.0 - - 2.1 - - 2.2 - - 2.3 - - 2.4 - - 2.5 -script: bundle exec rake $TASK -env: - - TASK=spec -matrix: - include: - env: TASK=rubocop - rvm: 2.5 + - 2.6 + - 2.7 +script: + - bundle exec rubocop + - bundle exec rake spec bundler_args: --without=localdev diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cdc4060..08962cdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # CHANGELOG +## Unreleased + + * Try to make this thread safe + [//]: # (comment: Don't forget to update lib/datadog/statsd.rb:DogStatsd::Statsd::VERSION when releasing a new version) ## 4.8.0 / 2020.04.20 diff --git a/Gemfile b/Gemfile index 3dccdfed..542b50b6 100644 --- a/Gemfile +++ b/Gemfile @@ -7,6 +7,7 @@ gem 'minitest-matchers' gem 'yard', '~> 0.9.20' gem 'single_cov' gem 'climate_control' +gem 'concurrent-ruby', '~> 1.1.6' if RUBY_VERSION >= '2.0.0' gem 'rubocop', '~> 0.50.0' # bump this and TargetRubyVersion once we drop ruby 2.0 diff --git a/lib/datadog/statsd.rb b/lib/datadog/statsd.rb index 4cb97751..9bd81bbb 100644 --- a/lib/datadog/statsd.rb +++ b/lib/datadog/statsd.rb @@ -1,4 +1,5 @@ # frozen_string_literal: true + require 'socket' require_relative 'statsd/version' @@ -42,6 +43,7 @@ class Statsd DISTRIBUTION_TYPE = 'd' TIMING_TYPE = 'ms' SET_TYPE = 's' + UNIT_MS = ['un:ms'].freeze # A namespace to prepend to all statsd calls. Defaults to no namespace. attr_reader :namespace @@ -110,6 +112,8 @@ def initialize( @sample_rate = sample_rate + @buffer = Concurrent::Array.new + # we reduce max_buffer_bytes by a the rough estimate of the telemetry payload @batch = Batch.new(connection, (max_buffer_bytes - telemetry.estimate_max_size)) end @@ -220,7 +224,13 @@ def distribution(stat, value, opts = EMPTY_OPTIONS) # @option opts [Array] :tags An array of tags def timing(stat, ms, opts = EMPTY_OPTIONS) opts = { sample_rate: opts } if opts.is_a?(Numeric) - send_stats(stat, ms, TIMING_TYPE, opts) + safe_tags = opts[:tags] || [] + if safe_tags.is_a?(Array) + safe_tags += UNIT_MS + elsif safe_tags.is_a?(Hash) + safe_tags[:un] = 'ms' + end + send_stats stat, ms, TIMING_TYPE, opts.merge(tags: safe_tags) end # Reports execution time of the provided block using {#timing}. diff --git a/lib/datadog/statsd/batch.rb b/lib/datadog/statsd/batch.rb index 84e12a66..f41f0c94 100644 --- a/lib/datadog/statsd/batch.rb +++ b/lib/datadog/statsd/batch.rb @@ -1,9 +1,16 @@ # frozen_string_literal: true +require 'concurrent' +require 'monitor' + module Datadog class Statsd class Batch + include MonitorMixin + def initialize(connection, max_buffer_bytes) + super() + @connection = connection @max_buffer_bytes = max_buffer_bytes @depth = 0 @@ -11,11 +18,10 @@ def initialize(connection, max_buffer_bytes) end def open - @depth += 1 - + synchronize { @depth += 1 } yield ensure - @depth -= 1 + synchronize { @depth -= 1 } flush if !open? end @@ -24,19 +30,21 @@ def open? end def add(message) - message_bytes = message.bytesize - - unless @buffer_bytes == 0 - if @buffer_bytes + 1 + message_bytes >= @max_buffer_bytes - flush - else - @buffer << "\n" - @buffer_bytes += 1 + synchronize do + message_bytes = message.bytesize + + unless @buffer_bytes == 0 + if @buffer_bytes + 1 + message_bytes >= @max_buffer_bytes + flush + else + @buffer << "\n" + @buffer_bytes += 1 + end end - end - @buffer << message - @buffer_bytes += message_bytes + @buffer << message + @buffer_bytes += message_bytes + end end def flush diff --git a/lib/datadog/statsd/connection.rb b/lib/datadog/statsd/connection.rb index a98bd88d..dc2d3df5 100644 --- a/lib/datadog/statsd/connection.rb +++ b/lib/datadog/statsd/connection.rb @@ -1,9 +1,15 @@ # frozen_string_literal: true +require 'concurrent' +require 'monitor' + module Datadog class Statsd class Connection + include MonitorMixin + def initialize(telemetry) + super() @telemetry = telemetry end @@ -18,36 +24,40 @@ def close end def write(payload) - logger.debug { "Statsd: #{payload}" } if logger + synchronize do + begin + logger.debug { "Statsd: #{payload}" } if logger - flush_telemetry = telemetry.flush? + flush_telemetry = telemetry.flush? - payload += telemetry.flush if flush_telemetry + payload += telemetry.flush if flush_telemetry - send_message(payload) + send_message(payload) - telemetry.reset if flush_telemetry + telemetry.reset if flush_telemetry - telemetry.sent(packets: 1, bytes: payload.length) - rescue StandardError => boom - # Try once to reconnect if the socket has been closed - retries ||= 1 - if retries <= 1 && - (boom.is_a?(Errno::ENOTCONN) or - boom.is_a?(Errno::ECONNREFUSED) or - boom.is_a?(IOError) && boom.message =~ /closed stream/i) - retries += 1 - begin - close - retry - rescue StandardError => e - boom = e + telemetry.sent(packets: 1, bytes: payload.length) + rescue StandardError => boom + # Try once to reconnect if the socket has been closed + retries ||= 1 + if retries <= 1 && + (boom.is_a?(Errno::ENOTCONN) or + boom.is_a?(Errno::ECONNREFUSED) or + boom.is_a?(IOError) && boom.message =~ /closed stream/i) + retries += 1 + begin + close + retry + rescue StandardError => e + boom = e + end + end + + telemetry.dropped(packets: 1, bytes: payload.length) + logger.error { "Statsd: #{boom.class} #{boom}" } if logger + nil end end - - telemetry.dropped(packets: 1, bytes: payload.length) - logger.error { "Statsd: #{boom.class} #{boom}" } if logger - nil end private diff --git a/lib/datadog/statsd/udp_connection.rb b/lib/datadog/statsd/udp_connection.rb index 166b24e4..28cf411c 100644 --- a/lib/datadog/statsd/udp_connection.rb +++ b/lib/datadog/statsd/udp_connection.rb @@ -1,10 +1,14 @@ # frozen_string_literal: true +require 'concurrent' +require 'monitor' require_relative 'connection' module Datadog class Statsd class UDPConnection < Connection + include MonitorMixin + DEFAULT_HOST = '127.0.0.1' DEFAULT_PORT = 8125 @@ -25,7 +29,7 @@ def initialize(host, port, logger, telemetry) def connect UDPSocket.new.tap do |socket| - socket.connect(host, port) + synchronize { socket.connect(host, port) } end end diff --git a/lib/datadog/statsd/version.rb b/lib/datadog/statsd/version.rb index 95d4e31d..fc729720 100644 --- a/lib/datadog/statsd/version.rb +++ b/lib/datadog/statsd/version.rb @@ -4,6 +4,6 @@ module Datadog class Statsd - VERSION = '4.8.0' + VERSION = '4.8.1' end end diff --git a/spec/integrations/allocation_spec.rb b/spec/integrations/allocation_spec.rb index 223d3472..e3ce988f 100644 --- a/spec/integrations/allocation_spec.rb +++ b/spec/integrations/allocation_spec.rb @@ -4,7 +4,7 @@ describe 'Allocations and garbage collection' do before do - skip 'Ruby too old' if RUBY_VERSION < '2.3.0' + skip 'Ruby too old' if RUBY_VERSION < '2.6.0' end let(:socket) { FakeUDPSocket.new } @@ -42,12 +42,10 @@ end let(:expected_allocations) do - if RUBY_VERSION < '2.4.0' - 18 - elsif RUBY_VERSION >= '2.4.0' && RUBY_VERSION < '2.5.0' + if RUBY_VERSION >= '2.6.6' && RUBY_VERSION < '2.7.0' 17 else - 16 + 15 end end @@ -69,12 +67,10 @@ end let(:expected_allocations) do - if RUBY_VERSION < '2.4.0' - 9 - elsif RUBY_VERSION >= '2.4.0' && RUBY_VERSION < '2.5.0' + if RUBY_VERSION >= '2.6.6' && RUBY_VERSION < '2.7.0' 8 else - 7 + 6 end end @@ -87,12 +83,10 @@ context 'with tags' do let(:expected_allocations) do - if RUBY_VERSION < '2.4.0' - 26 - elsif RUBY_VERSION >= '2.4.0' && RUBY_VERSION < '2.5.0' + if RUBY_VERSION >= '2.6.6' && RUBY_VERSION < '2.7.0' 25 else - 24 + 23 end end @@ -111,12 +105,10 @@ end let(:expected_allocations) do - if RUBY_VERSION < '2.4.0' - 18 - elsif RUBY_VERSION >= '2.4.0' && RUBY_VERSION < '2.5.0' - 17 + if RUBY_VERSION >= '2.6.6' && RUBY_VERSION < '2.7.0' + 24 else - 16 + 22 end end @@ -138,12 +130,10 @@ end let(:expected_allocations) do - if RUBY_VERSION < '2.4.0' - 9 - elsif RUBY_VERSION >= '2.4.0' && RUBY_VERSION < '2.5.0' - 8 + if RUBY_VERSION >= '2.6.6' && RUBY_VERSION < '2.7.0' + 15 else - 7 + 13 end end @@ -156,12 +146,10 @@ context 'with tags' do let(:expected_allocations) do - if RUBY_VERSION < '2.4.0' - 26 - elsif RUBY_VERSION >= '2.4.0' && RUBY_VERSION < '2.5.0' - 25 + if RUBY_VERSION >= '2.6.6' && RUBY_VERSION < '2.7.0' + 31 else - 24 + 29 end end @@ -180,12 +168,10 @@ end let(:expected_allocations) do - if RUBY_VERSION < '2.4.0' - 19 - elsif RUBY_VERSION >= '2.4.0' && RUBY_VERSION < '2.5.0' + if RUBY_VERSION >= '2.6.6' && RUBY_VERSION < '2.7.0' 18 else - 17 + 16 end end @@ -207,12 +193,10 @@ end let(:expected_allocations) do - if RUBY_VERSION < '2.4.0' - 10 - elsif RUBY_VERSION >= '2.4.0' && RUBY_VERSION < '2.5.0' + if RUBY_VERSION >= '2.6.6' && RUBY_VERSION < '2.7.0' 9 else - 8 + 7 end end @@ -225,12 +209,10 @@ context 'with tags' do let(:expected_allocations) do - if RUBY_VERSION < '2.4.0' - 28 - elsif RUBY_VERSION >= '2.4.0' && RUBY_VERSION < '2.5.0' + if RUBY_VERSION >= '2.6.6' && RUBY_VERSION < '2.7.0' 27 else - 26 + 25 end end @@ -249,12 +231,10 @@ end let(:expected_allocations) do - if RUBY_VERSION < '2.4.0' - 15 - elsif RUBY_VERSION >= '2.4.0' && RUBY_VERSION < '2.5.0' + if RUBY_VERSION >= '2.6.6' && RUBY_VERSION < '2.7.0' 14 else - 13 + 12 end end @@ -276,12 +256,10 @@ end let(:expected_allocations) do - if RUBY_VERSION < '2.4.0' - 6 - elsif RUBY_VERSION >= '2.4.0' && RUBY_VERSION < '2.5.0' + if RUBY_VERSION >= '2.6.6' && RUBY_VERSION < '2.7.0' 5 else - 4 + 3 end end @@ -294,12 +272,10 @@ context 'with tags' do let(:expected_allocations) do - if RUBY_VERSION < '2.4.0' - 24 - elsif RUBY_VERSION >= '2.4.0' && RUBY_VERSION < '2.5.0' + if RUBY_VERSION >= '2.6.6' && RUBY_VERSION < '2.7.0' 23 else - 22 + 21 end end @@ -310,4 +286,4 @@ end end end -end \ No newline at end of file +end diff --git a/spec/shared/metrics_method.rb b/spec/shared/metrics_method.rb index aa672181..66a969b4 100644 --- a/spec/shared/metrics_method.rb +++ b/spec/shared/metrics_method.rb @@ -8,3 +8,13 @@ it_behaves_like 'a log debuggable method', normal_expected_result it_behaves_like 'a taggable method', normal_expected_result, telemetry_options end + +RSpec.shared_examples 'a metrics method with timing' do |normal_expected_result, telemetry_options| + let(:action_tags) { [] } + + it_behaves_like 'a namespaceable method', "#{normal_expected_result}|#un:ms" + + it_behaves_like 'a log debuggable method', "#{normal_expected_result}|#un:ms" + + it_behaves_like 'a taggable method', normal_expected_result, telemetry_options, ',un:ms' +end diff --git a/spec/shared/namespaceable_method.rb b/spec/shared/namespaceable_method.rb index 56e9b0d3..6950fb4a 100644 --- a/spec/shared/namespaceable_method.rb +++ b/spec/shared/namespaceable_method.rb @@ -19,4 +19,4 @@ expect(socket.recv[0]).to eq_with_telemetry("yolo.#{normal_expected_result}") end end -end \ No newline at end of file +end diff --git a/spec/shared/taggable_method.rb b/spec/shared/taggable_method.rb index bb79b0e1..4d19c17e 100644 --- a/spec/shared/taggable_method.rb +++ b/spec/shared/taggable_method.rb @@ -1,5 +1,5 @@ -RSpec.shared_examples 'a taggable method' do |normal_expected_result, telemetry_options| +RSpec.shared_examples 'a taggable method' do |normal_expected_result, telemetry_options,timing| telemetry_options ||= {} context 'when tags are an array of strings' do @@ -10,7 +10,7 @@ it 'supports tags' do basic_action - expect(socket.recv[0]).to eq_with_telemetry "#{normal_expected_result}|#country:usa,state:ny,other", telemetry_options + expect(socket.recv[0]).to eq_with_telemetry "#{normal_expected_result}|#country:usa,state:ny,other#{timing}", telemetry_options end context 'when there is global tags' do @@ -21,7 +21,7 @@ it 'merges global and provided tags' do basic_action - expect(socket.recv[0]).to match(/^#{normal_expected_result}|#global_tag,country:usa,state:ny,other/) + expect(socket.recv[0]).to match(/^#{normal_expected_result}|#global_tag,country:usa,state:ny,other#{timing}/) end end end @@ -37,7 +37,7 @@ it 'supports tags' do basic_action - expect(socket.recv[0]).to eq_with_telemetry "#{normal_expected_result}|#country:usa,state:ny", telemetry_options + expect(socket.recv[0]).to eq_with_telemetry "#{normal_expected_result}|#country:usa,state:ny#{timing}", telemetry_options end context 'when there is global tags' do diff --git a/spec/statsd/version_spec.rb b/spec/statsd/version_spec.rb index 455ecab4..9f4a0b4c 100644 --- a/spec/statsd/version_spec.rb +++ b/spec/statsd/version_spec.rb @@ -3,7 +3,7 @@ describe Datadog::Statsd do describe 'VERSION' do it 'has a version' do - expect(Datadog::Statsd::VERSION).to eq '4.8.0' + expect(Datadog::Statsd::VERSION).to eq '4.8.1' end end -end \ No newline at end of file +end diff --git a/spec/statsd_spec.rb b/spec/statsd_spec.rb index 1ccbf53b..c4adebe5 100644 --- a/spec/statsd_spec.rb +++ b/spec/statsd_spec.rb @@ -463,8 +463,9 @@ let(:namespace) { nil } let(:sample_rate) { nil } let(:tags) { nil } + let(:action_tags) { [] } - it_behaves_like 'a metrics method', 'foobar:500|ms' do + it_behaves_like 'a metrics method with timing', 'foobar:500|ms' do let(:basic_action) do subject.timing('foobar', 500, tags: action_tags) end @@ -472,7 +473,7 @@ it 'sends the timing' do subject.timing('foobar', 500) - expect(socket.recv[0]).to eq_with_telemetry 'foobar:500|ms' + expect(socket.recv[0]).to eq_with_telemetry 'foobar:500|ms|#un:ms' end context 'with a sample rate' do @@ -482,7 +483,7 @@ it 'sends the timing with the sample rate' do subject.timing('foobar', 500, sample_rate: 0.5) - expect(socket.recv[0]).to eq_with_telemetry 'foobar:500|ms|@0.5' + expect(socket.recv[0]).to eq_with_telemetry 'foobar:500|ms|@0.5|#un:ms' end end @@ -493,7 +494,7 @@ it 'sends the timing with the sample rate' do subject.timing('foobar', 500, 0.5) - expect(socket.recv[0]).to eq_with_telemetry 'foobar:500|ms|@0.5' + expect(socket.recv[0]).to eq_with_telemetry 'foobar:500|ms|@0.5|#un:ms' end end end @@ -516,7 +517,7 @@ allow(Process).to receive(:clock_gettime).and_return(0) if Datadog::Statsd::PROCESS_TIME_SUPPORTED end - it_behaves_like 'a metrics method', 'foobar:1000|ms' do + it_behaves_like 'a metrics method with timing', 'foobar:1000|ms' do let(:basic_action) do subject.time('foobar', tags: action_tags) do Timecop.travel(after_date) @@ -532,7 +533,7 @@ allow(Process).to receive(:clock_gettime).and_return(1) if Datadog::Statsd::PROCESS_TIME_SUPPORTED end - expect(socket.recv[0]).to eq_with_telemetry 'foobar:1000|ms' + expect(socket.recv[0]).to eq_with_telemetry 'foobar:1000|ms|#un:ms' end it 'ensures the timing is sent' do @@ -544,7 +545,7 @@ end rescue nil # rubocop:enable Lint/RescueWithoutErrorClass - expect(socket.recv[0]).to eq_with_telemetry 'foobar:1000|ms' + expect(socket.recv[0]).to eq_with_telemetry 'foobar:1000|ms|#un:ms' end end @@ -579,7 +580,7 @@ allow(Process).to receive(:clock_gettime).and_return(1) if Datadog::Statsd::PROCESS_TIME_SUPPORTED end - expect(socket.recv[0]).to eq_with_telemetry 'foobar:1000|ms|@0.5' + expect(socket.recv[0]).to eq_with_telemetry 'foobar:1000|ms|@0.5|#un:ms' end end @@ -594,7 +595,7 @@ allow(Process).to receive(:clock_gettime).and_return(1) if Datadog::Statsd::PROCESS_TIME_SUPPORTED end - expect(socket.recv[0]).to eq_with_telemetry 'foobar:1000|ms|@0.5' + expect(socket.recv[0]).to eq_with_telemetry 'foobar:1000|ms|@0.5|#un:ms' end end end @@ -996,10 +997,10 @@ def subject.telemetry expect(socket.recv[0]).to eq_with_telemetry('test:21|h', metrics: 1, packets_sent: 1, bytes_sent: 683) subject.timing('test', 21) - expect(socket.recv[0]).to eq_with_telemetry('test:21|ms', metrics: 1, packets_sent: 1, bytes_sent: 683) + expect(socket.recv[0]).to eq_with_telemetry('test:21|ms|#un:ms', metrics: 1, packets_sent: 1, bytes_sent: 683) subject.set('test', 21) - expect(socket.recv[0]).to eq_with_telemetry('test:21|s', metrics: 1, packets_sent: 1, bytes_sent: 684) + expect(socket.recv[0]).to eq_with_telemetry('test:21|s', metrics: 1, packets_sent: 1, bytes_sent: 691) subject.service_check('sc', 0) expect(socket.recv[0]).to eq_with_telemetry('_sc|sc|0', metrics: 0, service_checks: 1, packets_sent: 1, bytes_sent: 683) @@ -1029,13 +1030,13 @@ def subject.telemetry s.event('ev', 'text') end - expect(socket.recv[0]).to eq_with_telemetry("test:1|c\ntest:-1|c\ntest:21|c\ntest:21|g\ntest:21|h\ntest:21|ms\ntest:21|s\n_sc|sc|0\n_e{2,4}:ev|text", + expect(socket.recv[0]).to eq_with_telemetry("test:1|c\ntest:-1|c\ntest:21|c\ntest:21|g\ntest:21|h\ntest:21|ms|#un:ms\ntest:21|s\n_sc|sc|0\n_e{2,4}:ev|text", metrics: 7, service_checks: 1, events: 1 ) - expect(subject.telemetry.flush).to eq_with_telemetry('', metrics: 0, service_checks: 0, events: 0, packets_sent: 1, bytes_sent: 766) + expect(subject.telemetry.flush).to eq_with_telemetry('', metrics: 0, service_checks: 0, events: 0, packets_sent: 1, bytes_sent: 773) end end @@ -1131,4 +1132,4 @@ class Datadog::Statsd::SomeClass; end expect(socket.recv[0]).to eq_with_telemetry 'stat:1|c|#yolo' end end -end \ No newline at end of file +end