From 1e258bbea474a1afb6a64213aaeb564d19305bfe Mon Sep 17 00:00:00 2001 From: Jimmy Chuang Date: Wed, 24 Jan 2018 15:32:36 -0800 Subject: [PATCH 1/3] first implementation of stream type collector --- cmd/agent.go | 6 +- cmd/task.go | 91 ++++++++++-- conf/{task-test.json => ddagent-task.json} | 7 +- conf/tasks.json | 68 --------- dockerfiles/local/Dockerfile | 2 +- pkg/collector/ddagent/ddagent.go | 114 +++++++++++++++ pkg/collector/ddagent/ddagent_test.go | 14 ++ pkg/collector/ddagent/protocol/receiver.go | 25 ++++ pkg/collector/ddagent/protocol/tcp.go | 160 +++++++++++++++++++++ pkg/collector/ddagent/protocol/tcp_test.go | 116 +++++++++++++++ pkg/collector/ddagent/types.go | 8 ++ pkg/collector/factory.go | 13 ++ pkg/common/util.go | 7 + 13 files changed, 541 insertions(+), 90 deletions(-) rename conf/{task-test.json => ddagent-task.json} (73%) delete mode 100644 conf/tasks.json create mode 100644 pkg/collector/ddagent/ddagent.go create mode 100644 pkg/collector/ddagent/ddagent_test.go create mode 100644 pkg/collector/ddagent/protocol/receiver.go create mode 100644 pkg/collector/ddagent/protocol/tcp.go create mode 100644 pkg/collector/ddagent/protocol/tcp_test.go create mode 100644 pkg/collector/ddagent/types.go create mode 100644 pkg/common/util.go diff --git a/cmd/agent.go b/cmd/agent.go index cca7141..dcc90d2 100644 --- a/cmd/agent.go +++ b/cmd/agent.go @@ -112,7 +112,7 @@ func (nodeAgent *NodeAgent) CreateTask(task *common.NodeTask) error { } } - newTask, err := NewHyperpilotTask(task, task.Id, metricTypes, + newTask, err := NewTask(task, task.Id, metricTypes, taskCollector, taskProcessor, nodeAgent) if err != nil { return errors.New(fmt.Sprintf("unable to new agent task {%s}: %s", task.Id, err.Error())) @@ -122,7 +122,7 @@ func (nodeAgent *NodeAgent) CreateTask(task *common.NodeTask) error { log.Warnf("Task id {%s} is duplicated, skip this task", task.Id) return nil } - nodeAgent.Tasks[task.Id] = newTask + nodeAgent.Tasks[task.Id] = &newTask return nil } @@ -146,7 +146,7 @@ func (nodeAgent *NodeAgent) CreatePublisher(p *common.Publish) error { func (nodeAgent *NodeAgent) Run() { for _, task := range nodeAgent.Tasks { - task.Run() + (*task).Run() } } diff --git a/cmd/task.go b/cmd/task.go index 5cf2224..9f28459 100644 --- a/cmd/task.go +++ b/cmd/task.go @@ -14,10 +14,9 @@ import ( log "github.com/sirupsen/logrus" ) -type HyperpilotTask struct { +type BaseTask struct { Task *common.NodeTask Id string - Collector collector.Collector Processor processor.Processor Publisher []*HyperpilotPublisher CollectMetrics []snap.Metric @@ -25,13 +24,27 @@ type HyperpilotTask struct { Agent *NodeAgent } -func NewHyperpilotTask( - task *common.NodeTask, +type HyperpilotTask interface { + Run() +} + +type NormalTask struct { + BaseTask + Collector collector.NormalCollector +} + +type StreamTask struct { + BaseTask + Collector collector.StreamCollector +} + +func NewTask(task *common.NodeTask, id string, allMetricTypes []snap.Metric, - collector collector.Collector, + coll collector.Collector, processor processor.Processor, - agent *NodeAgent) (*HyperpilotTask, error) { + agent *NodeAgent) (HyperpilotTask, error) { + var pubs []*HyperpilotPublisher for _, pubId := range *task.Publish { @@ -61,18 +74,62 @@ func NewHyperpilotTask( return nil, errors.New(errMsg) } - return &HyperpilotTask{ + base := BaseTask{ Task: task, Id: id, - Collector: collector, Processor: processor, Publisher: pubs, CollectMetrics: cmts, Agent: agent, - }, nil + } + + if common.IsInstanceOf(coll, (*collector.NormalCollector)(nil)) { + return &NormalTask{ + BaseTask: base, + Collector: coll.(collector.NormalCollector), + }, nil + } else { + return &StreamTask{ + BaseTask: base, + Collector: coll.(collector.StreamCollector), + }, nil + } +} + +func (task *StreamTask) Run() { + + if err := task.collect(); err != nil { + log.Errorf("Unable to start stream collector {%s}: %s", task.Id, err.Error()) + task.FailureCount++ + task.reportError(err) + return + } + + go func() { + log.Infof("wait for processing incoming snap metric") + for { + select { + case metrics := <-task.Collector.Metrics(): + addTags(task.Task.Collect.Tags, metrics) + if task.Processor != nil { + var err error + task.FailureCount++ + metrics, err = task.process(metrics, task.Task.Process.Config) + if err != nil { + task.reportError(err) + log.Warnf("process metric fail, skip this time: %s", err.Error()) + continue + } + } + for _, publish := range task.Publisher { + publish.Put(metrics) + } + } + } + }() } -func (task *HyperpilotTask) Run() { +func (task *NormalTask) Run() { waitTime, err := time.ParseDuration(task.Task.Schedule.Interval) if err != nil { log.Warnf("Parse schedule interval {%s} fail, use default interval 5 seconds", @@ -163,7 +220,7 @@ func addTags(tags map[string]map[string]string, mts []snap.Metric) []snap.Metric return newMts } -func (task *HyperpilotTask) collect() ([]snap.Metric, error) { +func (task *NormalTask) collect() ([]snap.Metric, error) { collectMetrics, err := task.Collector.CollectMetrics(task.CollectMetrics) if err != nil { return nil, fmt.Errorf("Unable to collect metrics for %s: %s", task.Id, err.Error()) @@ -172,11 +229,19 @@ func (task *HyperpilotTask) collect() ([]snap.Metric, error) { return addTags(task.Task.Collect.Tags, collectMetrics), nil } -func (task *HyperpilotTask) process(mts []snap.Metric, cfg snap.Config) ([]snap.Metric, error) { +func (task *StreamTask) collect() error { + if err := task.Collector.StreamMetrics(task.CollectMetrics); err != nil { + log.Warnf("") + return err + } + return nil +} + +func (task *BaseTask) process(mts []snap.Metric, cfg snap.Config) ([]snap.Metric, error) { return task.Processor.Process(mts, cfg) } -func (task *HyperpilotTask) reportError(err error) { +func (task *BaseTask) reportError(err error) { report := common.TaskReport{ Id: task.Id, Plugin: task.Task.Process.PluginName, diff --git a/conf/task-test.json b/conf/ddagent-task.json similarity index 73% rename from conf/task-test.json rename to conf/ddagent-task.json index bdee8c8..64951f5 100644 --- a/conf/task-test.json +++ b/conf/ddagent-task.json @@ -6,14 +6,11 @@ "interval": "5s" }, "collect": { - "plugin": "cpu", + "plugin": "ddagent", "metrics": { - "/intel/procfs/cpu/*": {} + "/ddagent/*": {} }, "config": { - "/intel/procfs/cpu": { - "proc_path": "/proc" - } } }, "publish": [ diff --git a/conf/tasks.json b/conf/tasks.json deleted file mode 100644 index d9499a7..0000000 --- a/conf/tasks.json +++ /dev/null @@ -1,68 +0,0 @@ -{ - "tasks": [ - { - "id": "task1", - "schedule": { - "interval": "5s" - }, - "collect": { - "plugin": "snap-plugin-collector-docker", - "metrics": "/intel/docker/*/stats/cgroups/*", - "config": { - "endpoint": "unix:///var/run/docker.sock", - "procfs": "/proc" - }, - "tags": { - "nodename": "", - "deploymentId": "" - } - }, - "process": { - "plugin": "snap-average-counter-processor", - "config": { - "collect.namespaces": "default", - "collect.include_empty_namespace": true, - "collect.exclude_metrics": "*cpu_stats/cpu_shares, */cpuset_stats/*, */pids_stats/*, */cpu_usage/per_cpu/*", - "collect.exclude_metrics.except": "", - "average": "*/blkio_stats/*, */cpu_usage/*, */cpu_stats/throttling_data/*, */cgroups/memory_stats/*/failcnt, */cgroups/memory_stats/*/pgfault, */cgroups/memory_stats/*/pgmajfault, */cgroups/memory_stats/*/pgpgin, */cgroups/memory_stats/*/pgpgout, */cgroups/memory_stats/*/total_pgfault, */cgroups/memory_stats/*/total_pgmajfault, */cgroups/memory_stats/*/total_pgppin, */cgroups/memory_stats/*/total_pgpgout, */cgroups/hugetlb_stats/*/failcnt" - } - }, - "publish": [ - "influxsrv", - "influxsrv2" - ] - } - ], - "publish": [ - { - "id": "influxsrv1", - "plugin": "snap-plugin-publisher-influxdb", - "config": { - "host": "", - "port": 8086, - "database": "snapaverage", - "user": "root", - "password": "default", - "retention": "autogen", - "scheme": "http", - "https": false, - "skip-verify": false - } - }, - { - "id": "influxsrv2", - "plugin": "snap-plugin-publisher-influxdb", - "config": { - "host": "", - "port": 8186, - "database": "snapaverage", - "user": "root", - "password": "default", - "retention": "autogen", - "scheme": "http", - "https": false, - "skip-verify": false - } - } - ] -} \ No newline at end of file diff --git a/dockerfiles/local/Dockerfile b/dockerfiles/local/Dockerfile index dc9107c..62e41b4 100644 --- a/dockerfiles/local/Dockerfile +++ b/dockerfiles/local/Dockerfile @@ -1,5 +1,5 @@ FROM ubuntu:xenial -RUN apt-get update && apt-get -y install curl +RUN apt-get update && apt-get -y install curl netcat jq RUN mkdir -p /etc/node_agent COPY ./bin/linux/node-agent . diff --git a/pkg/collector/ddagent/ddagent.go b/pkg/collector/ddagent/ddagent.go new file mode 100644 index 0000000..98bc35c --- /dev/null +++ b/pkg/collector/ddagent/ddagent.go @@ -0,0 +1,114 @@ +package ddagent + +import ( + "errors" + "encoding/json" + "strings" + "time" + + "github.com/hyperpilotio/node-agent/pkg/snap" + "github.com/hyperpilotio/node-agent/pkg/collector/ddagent/protocol" + log "github.com/sirupsen/logrus" +) + +type DdAgent struct { + tcp protocol.Receiver + inMetric chan *snap.Metric + outMetrics chan []snap.Metric + done chan struct{} + isStarted bool +} + +func New() (*DdAgent, error) { + return &DdAgent{ + tcp: protocol.NewTCPListener(8888), + inMetric: make(chan *snap.Metric, 1000), + outMetrics: make(chan []snap.Metric, 1000), + done: make(chan struct{}), + isStarted: false, + }, nil +} + +func (d *DdAgent) GetMetricTypes(cfg snap.Config) ([]snap.Metric, error) { + mts := []snap.Metric{} + vals := []string{"ddagent"} + for _, val := range vals { + metric := snap.Metric{ + Namespace: snap.NewNamespace(val), + } + mts = append(mts, metric) + } + + return mts, nil +} + +func (d *DdAgent) StreamMetrics(mts []snap.Metric) error { + if d.isStarted { + return errors.New("server already started") + } + log.Info("Starting tcp to receive datadog metric") + if err := d.tcp.Start(); err != nil { + return err + } + d.isStarted = true + + go func() { + log.Infof("start goroutine to parse datadog metric") + for { + select { + case data := <-d.tcp.Data(): + metric, err := parseData(data, mts) + if err != nil { + log.Warnf(err.Error()) + continue + } + d.inMetric <- metric + case <-d.done: + break + } + } + }() + + // routine that dispatches statsd metrics to all available streams + go func() { + log.Infof("start goroutine to transform snap metric") + for { + select { + case m := <-d.inMetric: + d.outMetrics <- []snap.Metric{*m} + case <-d.done: + return + } + } + }() + return nil +} + +func (d *DdAgent) Metrics() chan []snap.Metric { + return d.outMetrics +} + +func parseData(data []byte, mts []snap.Metric) (*snap.Metric, error) { + var ddMetric Metric + var snapMetric snap.Metric + + if err := json.Unmarshal(data, &ddMetric); err != nil { + log.Errorf("Unmarshal receiving datadog metric {%s} failed: %s", string(data), err.Error()) + return nil, err + } + + ns := snap.NewNamespace("ddagent") + ns = ns.AddStaticElements(strings.Split(ddMetric.MetricName, ".")...) + + //todo: compare mts + //return nil, errors.New("") + + //todo: fill in tag from datadog + + snapMetric = snap.Metric{ + Namespace: ns, + Timestamp: time.Unix(ddMetric.Timestamp, 0), + Data: ddMetric.Value, + } + return &snapMetric, nil +} diff --git a/pkg/collector/ddagent/ddagent_test.go b/pkg/collector/ddagent/ddagent_test.go new file mode 100644 index 0000000..2ac41fc --- /dev/null +++ b/pkg/collector/ddagent/ddagent_test.go @@ -0,0 +1,14 @@ +package ddagent + +import "testing" + +func TestAPI(t *testing.T) { + d, _ := New() + + c := make(chan interface{}) + + d.StreamMetrics(nil) + + <-c + +} diff --git a/pkg/collector/ddagent/protocol/receiver.go b/pkg/collector/ddagent/protocol/receiver.go new file mode 100644 index 0000000..e7c0999 --- /dev/null +++ b/pkg/collector/ddagent/protocol/receiver.go @@ -0,0 +1,25 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + +Copyright 2017 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protocol + +type Receiver interface { + Data() chan []byte + Start() error + Stop() +} diff --git a/pkg/collector/ddagent/protocol/tcp.go b/pkg/collector/ddagent/protocol/tcp.go new file mode 100644 index 0000000..a6ff50c --- /dev/null +++ b/pkg/collector/ddagent/protocol/tcp.go @@ -0,0 +1,160 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + +Copyright 2017 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protocol + +import ( + "bufio" + "fmt" + "net" + "strings" + + "time" + + "io" + + log "github.com/sirupsen/logrus" +) + +type tcpListener struct { + port *int + data chan []byte + listener *net.TCPListener + done chan struct{} +} + +func NewTCPListener(port int) *tcpListener { + listener := &tcpListener{ + data: make(chan []byte, 100), + done: make(chan struct{}), + port: &port, + } + return listener +} + +func (t *tcpListener) Data() chan []byte { + return t.data +} + +func (t *tcpListener) Stop() { + close(t.done) +} + +func (t *tcpListener) listen() error { + if t.listener == nil { + addr := fmt.Sprintf("%v:0", "0.0.0.0") + if t.port != nil { + addr = fmt.Sprintf("%v:%v", "0.0.0.0", *t.port) + } + tcpAddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + return err + } + t.listener, err = net.ListenTCP("tcp", tcpAddr) + if err != nil { + return err + } + log.WithFields( + log.Fields{ + "addr": t.listener.Addr().String(), + }, + ).Debug("tcp listening started") + } + return nil +} + +func (t *tcpListener) handleConn(conn net.Conn) { + defer conn.Close() + reader := bufio.NewReader(conn) + for { + select { + case <-t.done: + break + default: + log.WithFields(log.Fields{ + "ReadDeadLine": time.Now().Add(1 * time.Minute), + }).Debug("reading line") + conn.SetReadDeadline(time.Now().Add(1 * time.Minute)) + line, err := reader.ReadBytes('\n') + if err != nil { + if err == io.EOF { + log.WithFields(log.Fields{ + "peer": conn.RemoteAddr().String(), + "line": string(line), + "msg": "detected EOF before newline", + }).Warn("invalid line") + } else { + log.WithFields(log.Fields{ + "peer": conn.RemoteAddr().String(), + }).Error(err) + } + return + } + if len(line) > 0 { + line = line[:len(line)-1] // removes trailing '/n' + select { + case t.data <- line: + log.WithFields(log.Fields{ + "peer": conn.RemoteAddr().String(), + "line": string(line), + }).Debug("received line") + default: + log.WithFields(log.Fields{ + "peer": conn.RemoteAddr().String(), + "line": line, + "channel depth": len(t.data), + }).Warn("channel full - discarding value") + } + } + } + } +} + +func (t *tcpListener) Start() error { + if err := t.listen(); err != nil { + return err + } + log.WithField("addr", t.listener.Addr().String()).Debug("started TCP listener") + + go func() { + L: + for { + select { + case <-t.done: + log.WithField("addr", t.listener.Addr().String()).Debug("stopped TCP listener") + t.listener.Close() + break L + default: + // listen for incoming requests + conn, err := t.listener.AcceptTCP() + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { + break + } + log.WithFields(log.Fields{ + "addr": t.listener.Addr().String(), + }).Error(err) + break + } + // Handle connection + go t.handleConn(conn) + } + } + }() + return nil +} diff --git a/pkg/collector/ddagent/protocol/tcp_test.go b/pkg/collector/ddagent/protocol/tcp_test.go new file mode 100644 index 0000000..5c9fffb --- /dev/null +++ b/pkg/collector/ddagent/protocol/tcp_test.go @@ -0,0 +1,116 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + +Copyright 2017 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protocol + +import ( + "net" + "testing" + + "time" + + log "github.com/sirupsen/logrus" + . "github.com/smartystreets/goconvey/convey" +) + +func TestTCPListen(t *testing.T) { + log.SetLevel(log.DebugLevel) + Convey("Setup TCP server and client", t, func() { + tcpAddr, err := net.ResolveTCPAddr("tcp", "localhost:0") + So(err, ShouldBeNil) + So(tcpAddr, ShouldNotBeNil) + conn, err := net.ListenTCP("tcp", tcpAddr) + So(err, ShouldBeNil) + So(conn, ShouldNotBeNil) + port := 33334 + server := NewTCPListener(TCPListenerOption(conn), TCPListenPortOption(&port)) + So(server, ShouldNotBeNil) + err = server.Start() + time.Sleep(100 * time.Millisecond) + So(err, ShouldBeNil) + Convey("Send/receive messages (with newline)", func() { + msgs := []string{"hello\n", "foo\n", "bar\n"} + tcpAddr, err := net.ResolveTCPAddr("tcp", conn.Addr().String()) + So(err, ShouldBeNil) + So(tcpAddr, ShouldNotBeNil) + clientConn, err := net.DialTCP("tcp", nil, tcpAddr) + So(err, ShouldBeNil) + for _, msg := range msgs { + _, err := clientConn.Write([]byte(msg)) + So(err, ShouldBeNil) + } + for _, msg := range msgs { + select { + case data := <-server.data: + So(data, ShouldResemble, []byte(msg[:len(msg)-1])) + case <-time.After(time.Millisecond * 200): + t.Fatalf("timed out while reading sent data") + } + } + So(len(server.data), ShouldEqual, 0) + So(len(server.Data()), ShouldEqual, 0) + + Convey("without newline", func() { + msgs := []string{"hello", "foo", "bar"} + for _, msg := range msgs { + _, err := clientConn.Write([]byte(msg)) + So(err, ShouldBeNil) + } + select { + case <-server.data: + t.Fatalf("messages without a newline should be ignored") + case <-time.After(time.Millisecond * 100): + break + } + }) + }) + server.Stop() + reachedDone := false + select { + case <-server.done: + time.Sleep(100 * time.Millisecond) + reachedDone = true + case <-time.After(100 * time.Millisecond): + t.Error("Timed out waiting for TCP server to stop") + } + So(reachedDone, ShouldBeTrue) + }) + + Convey("Setup failing TCP server", t, func(c C) { + //good ResolveTCPAddr + tcpAddr, err := net.ResolveTCPAddr("tcp", "localhost:0") + So(err, ShouldBeNil) + So(tcpAddr, ShouldNotBeNil) + + //bad server.Start + BadConn, err := net.ListenTCP("tcppct", tcpAddr) + So(err, ShouldNotBeNil) + + //start server with badConn + listenPort := 5 + server := NewTCPListener(TCPListenerOption(BadConn), TCPListenPortOption(&listenPort)) + err = server.Start() + c.So(err, ShouldNotBeNil) + + //start server with no conn + server = NewTCPListener() + err = server.Start() + c.So(err, ShouldBeNil) + }) + +} diff --git a/pkg/collector/ddagent/types.go b/pkg/collector/ddagent/types.go new file mode 100644 index 0000000..054f3ab --- /dev/null +++ b/pkg/collector/ddagent/types.go @@ -0,0 +1,8 @@ +package ddagent + +type Metric struct { + MetricName string `json:"metric"` + Value string `json:"value"` + Timestamp int64 `json:"time_stamp"` + Host string `json:"host"` +} diff --git a/pkg/collector/factory.go b/pkg/collector/factory.go index 61563ff..2957d77 100644 --- a/pkg/collector/factory.go +++ b/pkg/collector/factory.go @@ -11,14 +11,25 @@ import ( "github.com/hyperpilotio/node-agent/pkg/collector/psutil" "github.com/hyperpilotio/node-agent/pkg/collector/use" "github.com/hyperpilotio/node-agent/pkg/snap" + "github.com/hyperpilotio/node-agent/pkg/collector/ddagent" ) // Collector is a plugin which is the source of new data in the Snap pipeline. type Collector interface { GetMetricTypes(snap.Config) ([]snap.Metric, error) +} + +type NormalCollector interface { + Collector CollectMetrics([]snap.Metric) ([]snap.Metric, error) } +type StreamCollector interface { + Collector + StreamMetrics([]snap.Metric) error + Metrics() chan []snap.Metric +} + func NewCollector(name string) (Collector, error) { switch name { case "cpu": @@ -35,6 +46,8 @@ func NewCollector(name string) (Collector, error) { return use.New() case "goddd": return goddd.New() + case "ddagent": + return ddagent.New() default: return nil, errors.New("Unsupported collector type: " + name) } diff --git a/pkg/common/util.go b/pkg/common/util.go new file mode 100644 index 0000000..e750414 --- /dev/null +++ b/pkg/common/util.go @@ -0,0 +1,7 @@ +package common + +import "reflect" + +func IsInstanceOf(objectPtr, typePtr interface{}) bool { + return reflect.TypeOf(objectPtr) == reflect.TypeOf(typePtr) +} From fe82fea6f77060bfaf75776df4d95d17d97c4643 Mon Sep 17 00:00:00 2001 From: Jimmy Chuang Date: Wed, 24 Jan 2018 16:23:53 -0800 Subject: [PATCH 2/3] ddagent port configuration --- conf/ddagent-task.json | 4 +- pkg/collector/ddagent/ddagent.go | 22 +++- pkg/collector/ddagent/protocol/receiver.go | 25 ----- pkg/collector/ddagent/protocol/tcp_test.go | 116 -------------------- pkg/collector/ddagent/{protocol => }/tcp.go | 19 ++-- pkg/common/task_model.go | 2 +- 6 files changed, 31 insertions(+), 157 deletions(-) delete mode 100644 pkg/collector/ddagent/protocol/receiver.go delete mode 100644 pkg/collector/ddagent/protocol/tcp_test.go rename pkg/collector/ddagent/{protocol => }/tcp.go (90%) diff --git a/conf/ddagent-task.json b/conf/ddagent-task.json index 64951f5..8db69f8 100644 --- a/conf/ddagent-task.json +++ b/conf/ddagent-task.json @@ -2,15 +2,13 @@ "tasks": [ { "id": "task1", - "schedule": { - "interval": "5s" - }, "collect": { "plugin": "ddagent", "metrics": { "/ddagent/*": {} }, "config": { + "port": "8877" } }, "publish": [ diff --git a/pkg/collector/ddagent/ddagent.go b/pkg/collector/ddagent/ddagent.go index 98bc35c..42de6f6 100644 --- a/pkg/collector/ddagent/ddagent.go +++ b/pkg/collector/ddagent/ddagent.go @@ -7,29 +7,45 @@ import ( "time" "github.com/hyperpilotio/node-agent/pkg/snap" - "github.com/hyperpilotio/node-agent/pkg/collector/ddagent/protocol" log "github.com/sirupsen/logrus" + "strconv" ) type DdAgent struct { - tcp protocol.Receiver + tcp *TCPListener inMetric chan *snap.Metric outMetrics chan []snap.Metric done chan struct{} isStarted bool + isInit bool } func New() (*DdAgent, error) { return &DdAgent{ - tcp: protocol.NewTCPListener(8888), + tcp: NewTCPListener(), inMetric: make(chan *snap.Metric, 1000), outMetrics: make(chan []snap.Metric, 1000), done: make(chan struct{}), isStarted: false, + isInit: false, }, nil } func (d *DdAgent) GetMetricTypes(cfg snap.Config) ([]snap.Metric, error) { + + if !d.isInit { + port, err := cfg.GetString("port") + if err != nil { + log.Warnf("Get Port configure failure: %s.", err.Error()) + log.Warnf("Use default port %d", *d.tcp.port) + } else { + p, _ := strconv.Atoi(port) + d.tcp.port = &p + log.Infof("Use configured port number %d ", p) + } + d.isInit = true + } + mts := []snap.Metric{} vals := []string{"ddagent"} for _, val := range vals { diff --git a/pkg/collector/ddagent/protocol/receiver.go b/pkg/collector/ddagent/protocol/receiver.go deleted file mode 100644 index e7c0999..0000000 --- a/pkg/collector/ddagent/protocol/receiver.go +++ /dev/null @@ -1,25 +0,0 @@ -/* -http://www.apache.org/licenses/LICENSE-2.0.txt - -Copyright 2017 Intel Corporation - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package protocol - -type Receiver interface { - Data() chan []byte - Start() error - Stop() -} diff --git a/pkg/collector/ddagent/protocol/tcp_test.go b/pkg/collector/ddagent/protocol/tcp_test.go deleted file mode 100644 index 5c9fffb..0000000 --- a/pkg/collector/ddagent/protocol/tcp_test.go +++ /dev/null @@ -1,116 +0,0 @@ -/* -http://www.apache.org/licenses/LICENSE-2.0.txt - -Copyright 2017 Intel Corporation - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package protocol - -import ( - "net" - "testing" - - "time" - - log "github.com/sirupsen/logrus" - . "github.com/smartystreets/goconvey/convey" -) - -func TestTCPListen(t *testing.T) { - log.SetLevel(log.DebugLevel) - Convey("Setup TCP server and client", t, func() { - tcpAddr, err := net.ResolveTCPAddr("tcp", "localhost:0") - So(err, ShouldBeNil) - So(tcpAddr, ShouldNotBeNil) - conn, err := net.ListenTCP("tcp", tcpAddr) - So(err, ShouldBeNil) - So(conn, ShouldNotBeNil) - port := 33334 - server := NewTCPListener(TCPListenerOption(conn), TCPListenPortOption(&port)) - So(server, ShouldNotBeNil) - err = server.Start() - time.Sleep(100 * time.Millisecond) - So(err, ShouldBeNil) - Convey("Send/receive messages (with newline)", func() { - msgs := []string{"hello\n", "foo\n", "bar\n"} - tcpAddr, err := net.ResolveTCPAddr("tcp", conn.Addr().String()) - So(err, ShouldBeNil) - So(tcpAddr, ShouldNotBeNil) - clientConn, err := net.DialTCP("tcp", nil, tcpAddr) - So(err, ShouldBeNil) - for _, msg := range msgs { - _, err := clientConn.Write([]byte(msg)) - So(err, ShouldBeNil) - } - for _, msg := range msgs { - select { - case data := <-server.data: - So(data, ShouldResemble, []byte(msg[:len(msg)-1])) - case <-time.After(time.Millisecond * 200): - t.Fatalf("timed out while reading sent data") - } - } - So(len(server.data), ShouldEqual, 0) - So(len(server.Data()), ShouldEqual, 0) - - Convey("without newline", func() { - msgs := []string{"hello", "foo", "bar"} - for _, msg := range msgs { - _, err := clientConn.Write([]byte(msg)) - So(err, ShouldBeNil) - } - select { - case <-server.data: - t.Fatalf("messages without a newline should be ignored") - case <-time.After(time.Millisecond * 100): - break - } - }) - }) - server.Stop() - reachedDone := false - select { - case <-server.done: - time.Sleep(100 * time.Millisecond) - reachedDone = true - case <-time.After(100 * time.Millisecond): - t.Error("Timed out waiting for TCP server to stop") - } - So(reachedDone, ShouldBeTrue) - }) - - Convey("Setup failing TCP server", t, func(c C) { - //good ResolveTCPAddr - tcpAddr, err := net.ResolveTCPAddr("tcp", "localhost:0") - So(err, ShouldBeNil) - So(tcpAddr, ShouldNotBeNil) - - //bad server.Start - BadConn, err := net.ListenTCP("tcppct", tcpAddr) - So(err, ShouldNotBeNil) - - //start server with badConn - listenPort := 5 - server := NewTCPListener(TCPListenerOption(BadConn), TCPListenPortOption(&listenPort)) - err = server.Start() - c.So(err, ShouldNotBeNil) - - //start server with no conn - server = NewTCPListener() - err = server.Start() - c.So(err, ShouldBeNil) - }) - -} diff --git a/pkg/collector/ddagent/protocol/tcp.go b/pkg/collector/ddagent/tcp.go similarity index 90% rename from pkg/collector/ddagent/protocol/tcp.go rename to pkg/collector/ddagent/tcp.go index a6ff50c..8f00af3 100644 --- a/pkg/collector/ddagent/protocol/tcp.go +++ b/pkg/collector/ddagent/tcp.go @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package protocol +package ddagent import ( "bufio" @@ -31,15 +31,16 @@ import ( log "github.com/sirupsen/logrus" ) -type tcpListener struct { +type TCPListener struct { port *int data chan []byte listener *net.TCPListener done chan struct{} } -func NewTCPListener(port int) *tcpListener { - listener := &tcpListener{ +func NewTCPListener() *TCPListener { + port := 8888 + listener := &TCPListener{ data: make(chan []byte, 100), done: make(chan struct{}), port: &port, @@ -47,15 +48,15 @@ func NewTCPListener(port int) *tcpListener { return listener } -func (t *tcpListener) Data() chan []byte { +func (t *TCPListener) Data() chan []byte { return t.data } -func (t *tcpListener) Stop() { +func (t *TCPListener) Stop() { close(t.done) } -func (t *tcpListener) listen() error { +func (t *TCPListener) listen() error { if t.listener == nil { addr := fmt.Sprintf("%v:0", "0.0.0.0") if t.port != nil { @@ -78,7 +79,7 @@ func (t *tcpListener) listen() error { return nil } -func (t *tcpListener) handleConn(conn net.Conn) { +func (t *TCPListener) handleConn(conn net.Conn) { defer conn.Close() reader := bufio.NewReader(conn) for { @@ -125,7 +126,7 @@ func (t *tcpListener) handleConn(conn net.Conn) { } } -func (t *tcpListener) Start() error { +func (t *TCPListener) Start() error { if err := t.listen(); err != nil { return err } diff --git a/pkg/common/task_model.go b/pkg/common/task_model.go index d8dffd3..94f5ae6 100644 --- a/pkg/common/task_model.go +++ b/pkg/common/task_model.go @@ -30,7 +30,7 @@ type Publish struct { type NodeTask struct { Id string `json:"id"` - Schedule Schedule `json:"schedule"` + Schedule Schedule `json:"schedule,omitempty"` Collect *Collect `json:"collect"` Process *Process `json:"process"` Publish *[]string `json:"publish"` From 3fee1328f252340d1c7f8b01f7514e1c7ad75a66 Mon Sep 17 00:00:00 2001 From: Jimmy Chuang Date: Wed, 24 Jan 2018 17:18:11 -0800 Subject: [PATCH 3/3] support tag in ddagent metric --- cmd/task.go | 2 +- pkg/collector/ddagent/ddagent.go | 16 +++++++++------- pkg/collector/ddagent/types.go | 9 +++++---- pkg/collector/factory.go | 2 +- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/cmd/task.go b/cmd/task.go index 9f28459..2830880 100644 --- a/cmd/task.go +++ b/cmd/task.go @@ -230,7 +230,7 @@ func (task *NormalTask) collect() ([]snap.Metric, error) { } func (task *StreamTask) collect() error { - if err := task.Collector.StreamMetrics(task.CollectMetrics); err != nil { + if err := task.Collector.StreamMetrics(); err != nil { log.Warnf("") return err } diff --git a/pkg/collector/ddagent/ddagent.go b/pkg/collector/ddagent/ddagent.go index 42de6f6..c4510b3 100644 --- a/pkg/collector/ddagent/ddagent.go +++ b/pkg/collector/ddagent/ddagent.go @@ -58,7 +58,7 @@ func (d *DdAgent) GetMetricTypes(cfg snap.Config) ([]snap.Metric, error) { return mts, nil } -func (d *DdAgent) StreamMetrics(mts []snap.Metric) error { +func (d *DdAgent) StreamMetrics() error { if d.isStarted { return errors.New("server already started") } @@ -73,7 +73,7 @@ func (d *DdAgent) StreamMetrics(mts []snap.Metric) error { for { select { case data := <-d.tcp.Data(): - metric, err := parseData(data, mts) + metric, err := parseData(data) if err != nil { log.Warnf(err.Error()) continue @@ -104,7 +104,7 @@ func (d *DdAgent) Metrics() chan []snap.Metric { return d.outMetrics } -func parseData(data []byte, mts []snap.Metric) (*snap.Metric, error) { +func parseData(data []byte) (*snap.Metric, error) { var ddMetric Metric var snapMetric snap.Metric @@ -116,13 +116,15 @@ func parseData(data []byte, mts []snap.Metric) (*snap.Metric, error) { ns := snap.NewNamespace("ddagent") ns = ns.AddStaticElements(strings.Split(ddMetric.MetricName, ".")...) - //todo: compare mts - //return nil, errors.New("") - - //todo: fill in tag from datadog + tag := make(map[string]string) + tag["host"] = ddMetric.Host + for k, v := range ddMetric.Tags { + tag[k] = v + } snapMetric = snap.Metric{ Namespace: ns, + Tags: tag, Timestamp: time.Unix(ddMetric.Timestamp, 0), Data: ddMetric.Value, } diff --git a/pkg/collector/ddagent/types.go b/pkg/collector/ddagent/types.go index 054f3ab..bc7e579 100644 --- a/pkg/collector/ddagent/types.go +++ b/pkg/collector/ddagent/types.go @@ -1,8 +1,9 @@ package ddagent type Metric struct { - MetricName string `json:"metric"` - Value string `json:"value"` - Timestamp int64 `json:"time_stamp"` - Host string `json:"host"` + MetricName string `json:"metric"` + Value string `json:"value"` + Timestamp int64 `json:"time_stamp"` + Host string `json:"host"` + Tags map[string]string `json:"tags,omitempty"` } diff --git a/pkg/collector/factory.go b/pkg/collector/factory.go index 2957d77..7d61c2c 100644 --- a/pkg/collector/factory.go +++ b/pkg/collector/factory.go @@ -26,7 +26,7 @@ type NormalCollector interface { type StreamCollector interface { Collector - StreamMetrics([]snap.Metric) error + StreamMetrics() error Metrics() chan []snap.Metric }