diff --git a/cmd/agent.go b/cmd/agent.go index 40632f7..afcef74 100644 --- a/cmd/agent.go +++ b/cmd/agent.go @@ -133,7 +133,7 @@ func (nodeAgent *NodeAgent) CreateTask(task *common.NodeTask) error { } } - newTask, err := NewHyperpilotTask(task, task.Id, metricTypes, + newTask, err := NewTask(task, task.Id, metricTypes, taskCollector, taskProcessor, taskAnalyzer, nodeAgent) if err != nil { return errors.New(fmt.Sprintf("Unable to new agent task {%s}: %s", task.Id, err.Error())) @@ -143,7 +143,7 @@ func (nodeAgent *NodeAgent) CreateTask(task *common.NodeTask) error { log.Warnf("Task id {%s} is duplicated, skip this task", task.Id) return nil } - nodeAgent.Tasks[task.Id] = newTask + nodeAgent.Tasks[task.Id] = &newTask return nil } @@ -167,7 +167,7 @@ func (nodeAgent *NodeAgent) CreatePublisher(p *common.Publish) error { func (nodeAgent *NodeAgent) Run() { for _, task := range nodeAgent.Tasks { - task.Run() + (*task).Run() } } diff --git a/cmd/task.go b/cmd/task.go index f9b41e6..27b0c77 100644 --- a/cmd/task.go +++ b/cmd/task.go @@ -20,10 +20,9 @@ type PublishConfig struct { AnalyzerPublisher []*HyperpilotPublisher } -type HyperpilotTask struct { +type BaseTask struct { Task *common.NodeTask Id string - Collector collector.Collector Processor processor.Processor Analyzer analyzer.Analyzer PublishConfig *PublishConfig @@ -32,14 +31,27 @@ type HyperpilotTask struct { Agent *NodeAgent } -func NewHyperpilotTask( - task *common.NodeTask, +type HyperpilotTask interface { + Run() +} + +type NormalTask struct { + BaseTask + Collector collector.NormalCollector +} + +type StreamTask struct { + BaseTask + Collector collector.StreamCollector +} + +func NewTask(task *common.NodeTask, id string, allMetricTypes []snap.Metric, - collector collector.Collector, + coll collector.Collector, processor processor.Processor, analyzer analyzer.Analyzer, - agent *NodeAgent) (*HyperpilotTask, error) { + agent *NodeAgent) (HyperpilotTask, error) { var pubs []*HyperpilotPublisher for _, pubId := range *task.Publish { p, ok := agent.Publishers[pubId] @@ -80,10 +92,9 @@ func NewHyperpilotTask( return nil, errors.New(errMsg) } - return &HyperpilotTask{ + base := BaseTask{ Task: task, Id: id, - Collector: collector, Processor: processor, Analyzer: analyzer, PublishConfig: &PublishConfig{ @@ -92,10 +103,70 @@ func NewHyperpilotTask( }, CollectMetrics: cmts, Agent: agent, - }, nil + } + + if common.IsInstanceOf(coll, (*collector.NormalCollector)(nil)) { + return &NormalTask{ + BaseTask: base, + Collector: coll.(collector.NormalCollector), + }, nil + } else { + return &StreamTask{ + BaseTask: base, + Collector: coll.(collector.StreamCollector), + }, nil + } } -func (task *HyperpilotTask) Run() { +func (task *StreamTask) Run() { + + if err := task.collect(); err != nil { + log.Errorf("Unable to start stream collector {%s}: %s", task.Id, err.Error()) + task.FailureCount++ + task.reportError(err) + return + } + + go func() { + log.Infof("wait for processing incoming snap metric") + for { + select { + case metrics := <-task.Collector.Metrics(): + addTags(task.Task.Collect.Tags, metrics) + if task.Processor != nil { + var err error + task.FailureCount++ + metrics, err = task.process(metrics, task.Task.Process.Config) + if err != nil { + task.reportError(err) + log.Warnf("process metric fail, skip this time: %s", err.Error()) + continue + } + } + for _, publish := range task.PublishConfig.Publisher { + publish.Put(metrics) + } + + if task.Analyzer != nil { + derivedMetrics, err := task.analyze(metrics, task.Task.Analyze.Config) + if err != nil { + task.FailureCount++ + task.reportError(err) + log.Warnf("analyze metric fail for %s, skip this time: %s", task.Task.Id, err.Error()) + continue + } + for _, publish := range task.PublishConfig.AnalyzerPublisher { + if len(derivedMetrics) > 0 { + publish.Put(derivedMetrics) + } + } + } + } + } + }() +} + +func (task *NormalTask) Run() { waitTime, err := time.ParseDuration(task.Task.Schedule.Interval) if err != nil { log.Warnf("Parse schedule interval {%s} fail, use default interval 5 seconds", @@ -204,7 +275,7 @@ func addTags(tags map[string]map[string]string, mts []snap.Metric) []snap.Metric return newMts } -func (task *HyperpilotTask) collect() ([]snap.Metric, error) { +func (task *NormalTask) collect() ([]snap.Metric, error) { collectMetrics, err := task.Collector.CollectMetrics(task.CollectMetrics) if err != nil { return nil, fmt.Errorf("Unable to collect metrics for %s: %s", task.Id, err.Error()) @@ -213,15 +284,23 @@ func (task *HyperpilotTask) collect() ([]snap.Metric, error) { return addTags(task.Task.Collect.Tags, collectMetrics), nil } -func (task *HyperpilotTask) process(mts []snap.Metric, cfg snap.Config) ([]snap.Metric, error) { +func (task *StreamTask) collect() error { + if err := task.Collector.StreamMetrics(); err != nil { + log.Warnf("") + return err + } + return nil +} + +func (task *BaseTask) process(mts []snap.Metric, cfg snap.Config) ([]snap.Metric, error) { return task.Processor.Process(mts, cfg) } -func (task *HyperpilotTask) analyze(mts []snap.Metric, cfg snap.Config) ([]snap.Metric, error) { +func (task *BaseTask) analyze(mts []snap.Metric, cfg snap.Config) ([]snap.Metric, error) { return task.Analyzer.Analyze(mts, cfg) } -func (task *HyperpilotTask) reportError(err error) { +func (task *BaseTask) reportError(err error) { report := common.TaskReport{ Id: task.Id, Plugin: task.Task.Process.PluginName, diff --git a/conf/task-test.json b/conf/ddagent-task.json similarity index 63% rename from conf/task-test.json rename to conf/ddagent-task.json index bdee8c8..8db69f8 100644 --- a/conf/task-test.json +++ b/conf/ddagent-task.json @@ -2,18 +2,13 @@ "tasks": [ { "id": "task1", - "schedule": { - "interval": "5s" - }, "collect": { - "plugin": "cpu", + "plugin": "ddagent", "metrics": { - "/intel/procfs/cpu/*": {} + "/ddagent/*": {} }, "config": { - "/intel/procfs/cpu": { - "proc_path": "/proc" - } + "port": "8877" } }, "publish": [ diff --git a/conf/tasks.json b/conf/tasks.json deleted file mode 100644 index d9499a7..0000000 --- a/conf/tasks.json +++ /dev/null @@ -1,68 +0,0 @@ -{ - "tasks": [ - { - "id": "task1", - "schedule": { - "interval": "5s" - }, - "collect": { - "plugin": "snap-plugin-collector-docker", - "metrics": "/intel/docker/*/stats/cgroups/*", - "config": { - "endpoint": "unix:///var/run/docker.sock", - "procfs": "/proc" - }, - "tags": { - "nodename": "", - "deploymentId": "" - } - }, - "process": { - "plugin": "snap-average-counter-processor", - "config": { - "collect.namespaces": "default", - "collect.include_empty_namespace": true, - "collect.exclude_metrics": "*cpu_stats/cpu_shares, */cpuset_stats/*, */pids_stats/*, */cpu_usage/per_cpu/*", - "collect.exclude_metrics.except": "", - "average": "*/blkio_stats/*, */cpu_usage/*, */cpu_stats/throttling_data/*, */cgroups/memory_stats/*/failcnt, */cgroups/memory_stats/*/pgfault, */cgroups/memory_stats/*/pgmajfault, */cgroups/memory_stats/*/pgpgin, */cgroups/memory_stats/*/pgpgout, */cgroups/memory_stats/*/total_pgfault, */cgroups/memory_stats/*/total_pgmajfault, */cgroups/memory_stats/*/total_pgppin, */cgroups/memory_stats/*/total_pgpgout, */cgroups/hugetlb_stats/*/failcnt" - } - }, - "publish": [ - "influxsrv", - "influxsrv2" - ] - } - ], - "publish": [ - { - "id": "influxsrv1", - "plugin": "snap-plugin-publisher-influxdb", - "config": { - "host": "", - "port": 8086, - "database": "snapaverage", - "user": "root", - "password": "default", - "retention": "autogen", - "scheme": "http", - "https": false, - "skip-verify": false - } - }, - { - "id": "influxsrv2", - "plugin": "snap-plugin-publisher-influxdb", - "config": { - "host": "", - "port": 8186, - "database": "snapaverage", - "user": "root", - "password": "default", - "retention": "autogen", - "scheme": "http", - "https": false, - "skip-verify": false - } - } - ] -} \ No newline at end of file diff --git a/dockerfiles/local/Dockerfile b/dockerfiles/local/Dockerfile index dc9107c..62e41b4 100644 --- a/dockerfiles/local/Dockerfile +++ b/dockerfiles/local/Dockerfile @@ -1,5 +1,5 @@ FROM ubuntu:xenial -RUN apt-get update && apt-get -y install curl +RUN apt-get update && apt-get -y install curl netcat jq RUN mkdir -p /etc/node_agent COPY ./bin/linux/node-agent . diff --git a/pkg/collector/ddagent/ddagent.go b/pkg/collector/ddagent/ddagent.go new file mode 100644 index 0000000..c4510b3 --- /dev/null +++ b/pkg/collector/ddagent/ddagent.go @@ -0,0 +1,132 @@ +package ddagent + +import ( + "errors" + "encoding/json" + "strings" + "time" + + "github.com/hyperpilotio/node-agent/pkg/snap" + log "github.com/sirupsen/logrus" + "strconv" +) + +type DdAgent struct { + tcp *TCPListener + inMetric chan *snap.Metric + outMetrics chan []snap.Metric + done chan struct{} + isStarted bool + isInit bool +} + +func New() (*DdAgent, error) { + return &DdAgent{ + tcp: NewTCPListener(), + inMetric: make(chan *snap.Metric, 1000), + outMetrics: make(chan []snap.Metric, 1000), + done: make(chan struct{}), + isStarted: false, + isInit: false, + }, nil +} + +func (d *DdAgent) GetMetricTypes(cfg snap.Config) ([]snap.Metric, error) { + + if !d.isInit { + port, err := cfg.GetString("port") + if err != nil { + log.Warnf("Get Port configure failure: %s.", err.Error()) + log.Warnf("Use default port %d", *d.tcp.port) + } else { + p, _ := strconv.Atoi(port) + d.tcp.port = &p + log.Infof("Use configured port number %d ", p) + } + d.isInit = true + } + + mts := []snap.Metric{} + vals := []string{"ddagent"} + for _, val := range vals { + metric := snap.Metric{ + Namespace: snap.NewNamespace(val), + } + mts = append(mts, metric) + } + + return mts, nil +} + +func (d *DdAgent) StreamMetrics() error { + if d.isStarted { + return errors.New("server already started") + } + log.Info("Starting tcp to receive datadog metric") + if err := d.tcp.Start(); err != nil { + return err + } + d.isStarted = true + + go func() { + log.Infof("start goroutine to parse datadog metric") + for { + select { + case data := <-d.tcp.Data(): + metric, err := parseData(data) + if err != nil { + log.Warnf(err.Error()) + continue + } + d.inMetric <- metric + case <-d.done: + break + } + } + }() + + // routine that dispatches statsd metrics to all available streams + go func() { + log.Infof("start goroutine to transform snap metric") + for { + select { + case m := <-d.inMetric: + d.outMetrics <- []snap.Metric{*m} + case <-d.done: + return + } + } + }() + return nil +} + +func (d *DdAgent) Metrics() chan []snap.Metric { + return d.outMetrics +} + +func parseData(data []byte) (*snap.Metric, error) { + var ddMetric Metric + var snapMetric snap.Metric + + if err := json.Unmarshal(data, &ddMetric); err != nil { + log.Errorf("Unmarshal receiving datadog metric {%s} failed: %s", string(data), err.Error()) + return nil, err + } + + ns := snap.NewNamespace("ddagent") + ns = ns.AddStaticElements(strings.Split(ddMetric.MetricName, ".")...) + + tag := make(map[string]string) + tag["host"] = ddMetric.Host + for k, v := range ddMetric.Tags { + tag[k] = v + } + + snapMetric = snap.Metric{ + Namespace: ns, + Tags: tag, + Timestamp: time.Unix(ddMetric.Timestamp, 0), + Data: ddMetric.Value, + } + return &snapMetric, nil +} diff --git a/pkg/collector/ddagent/ddagent_test.go b/pkg/collector/ddagent/ddagent_test.go new file mode 100644 index 0000000..2ac41fc --- /dev/null +++ b/pkg/collector/ddagent/ddagent_test.go @@ -0,0 +1,14 @@ +package ddagent + +import "testing" + +func TestAPI(t *testing.T) { + d, _ := New() + + c := make(chan interface{}) + + d.StreamMetrics(nil) + + <-c + +} diff --git a/pkg/collector/ddagent/tcp.go b/pkg/collector/ddagent/tcp.go new file mode 100644 index 0000000..8f00af3 --- /dev/null +++ b/pkg/collector/ddagent/tcp.go @@ -0,0 +1,161 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + +Copyright 2017 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ddagent + +import ( + "bufio" + "fmt" + "net" + "strings" + + "time" + + "io" + + log "github.com/sirupsen/logrus" +) + +type TCPListener struct { + port *int + data chan []byte + listener *net.TCPListener + done chan struct{} +} + +func NewTCPListener() *TCPListener { + port := 8888 + listener := &TCPListener{ + data: make(chan []byte, 100), + done: make(chan struct{}), + port: &port, + } + return listener +} + +func (t *TCPListener) Data() chan []byte { + return t.data +} + +func (t *TCPListener) Stop() { + close(t.done) +} + +func (t *TCPListener) listen() error { + if t.listener == nil { + addr := fmt.Sprintf("%v:0", "0.0.0.0") + if t.port != nil { + addr = fmt.Sprintf("%v:%v", "0.0.0.0", *t.port) + } + tcpAddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + return err + } + t.listener, err = net.ListenTCP("tcp", tcpAddr) + if err != nil { + return err + } + log.WithFields( + log.Fields{ + "addr": t.listener.Addr().String(), + }, + ).Debug("tcp listening started") + } + return nil +} + +func (t *TCPListener) handleConn(conn net.Conn) { + defer conn.Close() + reader := bufio.NewReader(conn) + for { + select { + case <-t.done: + break + default: + log.WithFields(log.Fields{ + "ReadDeadLine": time.Now().Add(1 * time.Minute), + }).Debug("reading line") + conn.SetReadDeadline(time.Now().Add(1 * time.Minute)) + line, err := reader.ReadBytes('\n') + if err != nil { + if err == io.EOF { + log.WithFields(log.Fields{ + "peer": conn.RemoteAddr().String(), + "line": string(line), + "msg": "detected EOF before newline", + }).Warn("invalid line") + } else { + log.WithFields(log.Fields{ + "peer": conn.RemoteAddr().String(), + }).Error(err) + } + return + } + if len(line) > 0 { + line = line[:len(line)-1] // removes trailing '/n' + select { + case t.data <- line: + log.WithFields(log.Fields{ + "peer": conn.RemoteAddr().String(), + "line": string(line), + }).Debug("received line") + default: + log.WithFields(log.Fields{ + "peer": conn.RemoteAddr().String(), + "line": line, + "channel depth": len(t.data), + }).Warn("channel full - discarding value") + } + } + } + } +} + +func (t *TCPListener) Start() error { + if err := t.listen(); err != nil { + return err + } + log.WithField("addr", t.listener.Addr().String()).Debug("started TCP listener") + + go func() { + L: + for { + select { + case <-t.done: + log.WithField("addr", t.listener.Addr().String()).Debug("stopped TCP listener") + t.listener.Close() + break L + default: + // listen for incoming requests + conn, err := t.listener.AcceptTCP() + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { + break + } + log.WithFields(log.Fields{ + "addr": t.listener.Addr().String(), + }).Error(err) + break + } + // Handle connection + go t.handleConn(conn) + } + } + }() + return nil +} diff --git a/pkg/collector/ddagent/types.go b/pkg/collector/ddagent/types.go new file mode 100644 index 0000000..bc7e579 --- /dev/null +++ b/pkg/collector/ddagent/types.go @@ -0,0 +1,9 @@ +package ddagent + +type Metric struct { + MetricName string `json:"metric"` + Value string `json:"value"` + Timestamp int64 `json:"time_stamp"` + Host string `json:"host"` + Tags map[string]string `json:"tags,omitempty"` +} diff --git a/pkg/collector/factory.go b/pkg/collector/factory.go index 61563ff..7d61c2c 100644 --- a/pkg/collector/factory.go +++ b/pkg/collector/factory.go @@ -11,14 +11,25 @@ import ( "github.com/hyperpilotio/node-agent/pkg/collector/psutil" "github.com/hyperpilotio/node-agent/pkg/collector/use" "github.com/hyperpilotio/node-agent/pkg/snap" + "github.com/hyperpilotio/node-agent/pkg/collector/ddagent" ) // Collector is a plugin which is the source of new data in the Snap pipeline. type Collector interface { GetMetricTypes(snap.Config) ([]snap.Metric, error) +} + +type NormalCollector interface { + Collector CollectMetrics([]snap.Metric) ([]snap.Metric, error) } +type StreamCollector interface { + Collector + StreamMetrics() error + Metrics() chan []snap.Metric +} + func NewCollector(name string) (Collector, error) { switch name { case "cpu": @@ -35,6 +46,8 @@ func NewCollector(name string) (Collector, error) { return use.New() case "goddd": return goddd.New() + case "ddagent": + return ddagent.New() default: return nil, errors.New("Unsupported collector type: " + name) } diff --git a/pkg/common/task_model.go b/pkg/common/task_model.go index 92c539e..e675a14 100644 --- a/pkg/common/task_model.go +++ b/pkg/common/task_model.go @@ -36,7 +36,7 @@ type Publish struct { type NodeTask struct { Id string `json:"id"` - Schedule Schedule `json:"schedule"` + Schedule Schedule `json:"schedule,omitempty"` Collect *Collect `json:"collect"` Process *Process `json:"process"` Analyze *Analyze `json:"analyze"` diff --git a/pkg/common/util.go b/pkg/common/util.go new file mode 100644 index 0000000..e750414 --- /dev/null +++ b/pkg/common/util.go @@ -0,0 +1,7 @@ +package common + +import "reflect" + +func IsInstanceOf(objectPtr, typePtr interface{}) bool { + return reflect.TypeOf(objectPtr) == reflect.TypeOf(typePtr) +}