| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584 |
- package main
- import (
- "bytes"
- "encoding/json"
- "io"
- "net/http"
- _ "net/http/pprof"
- "os"
- "os/signal"
- "path"
- "path/filepath"
- "syscall"
- "github.com/cilium/ebpf/rlimit"
- "github.com/coroot/coroot-node-agent/kube"
- "github.com/coroot/coroot-node-agent/utils"
- "github.com/coroot/coroot-node-agent/utils/namedpipe"
- "github.com/coroot/coroot-node-agent/utils/try"
- dto "github.com/prometheus/client_model/go"
- log "github.com/sirupsen/logrus"
- // "regexp"
- "runtime"
- "strconv"
- "strings"
- "time"
- "github.com/coroot/coroot-node-agent/common"
- "github.com/coroot/coroot-node-agent/config"
- "github.com/coroot/coroot-node-agent/containers"
- "github.com/coroot/coroot-node-agent/flags"
- "github.com/coroot/coroot-node-agent/logs"
- "github.com/coroot/coroot-node-agent/node"
- "github.com/coroot/coroot-node-agent/tracing"
- "github.com/prometheus/client_golang/prometheus"
- "golang.org/x/mod/semver"
- "golang.org/x/sys/unix"
- "golang.org/x/time/rate"
- )
- var (
- version = flags.AgentVersion
- sendNetDataInterval = 1 * time.Minute
- )
- const minSupportedKernelVersion = "4.18"
- func init() {
- logs.FormatterInit()
- }
- func uname() (string, string, error) {
- runtime.LockOSThread()
- defer runtime.UnlockOSThread()
- f, err := os.Open("/proc/1/ns/uts")
- if err != nil {
- return "", "", err
- }
- defer f.Close()
- self, err := os.Open("/proc/self/ns/uts")
- if err != nil {
- return "", "", err
- }
- defer self.Close()
- defer func() {
- unix.Setns(int(self.Fd()), unix.CLONE_NEWUTS)
- }()
- err = unix.Setns(int(f.Fd()), unix.CLONE_NEWUTS)
- if err != nil {
- return "", "", err
- }
- var utsname unix.Utsname
- if err := unix.Uname(&utsname); err != nil {
- return "", "", err
- }
- hostname := string(bytes.Split(utsname.Nodename[:], []byte{0})[0])
- kernelVersion := string(bytes.Split(utsname.Release[:], []byte{0})[0])
- return hostname, kernelVersion, nil
- }
- func machineID() string {
- for _, p := range []string{"sys/devices/virtual/dmi/id/product_uuid", "etc/machine-id", "var/lib/dbus/machine-id"} {
- payload, err := os.ReadFile(path.Join("/proc/1/root", p))
- if err != nil {
- log.Warningln("failed to read machine-id:", err)
- continue
- }
- id := strings.TrimSpace(strings.Replace(string(payload), "-", "", -1))
- log.Infoln("machine-id: ", id)
- return id
- }
- return ""
- }
- func whitelistNodeExternalNetworks() {
- netdevs, err := node.NetDevices()
- if err != nil {
- log.Warningln("failed to get network interfaces:", err)
- return
- }
- for _, iface := range netdevs {
- for _, p := range iface.IPPrefixes {
- if p.IP().IsLoopback() || common.IsIpPrivate(p.IP()) {
- continue
- }
- // if the node has an external network IP, whitelist that network
- common.ConnectionFilter.WhitelistPrefix(p)
- }
- }
- }
- type MetricItemData struct {
- Label map[string]string `json:"metric_tags"`
- Value any `json:"value"`
- }
- type MetricData struct {
- MetricKey string `json:"metric_key"`
- Metric []MetricItemData `json:"metric"`
- }
- type PostData struct {
- AccountID string `json:"accountId"`
- IP string `json:"ip"`
- HostID int64 `json:"hostId"`
- TimeStamp uint64 `json:"time_stamp"`
- ServiceType uint64 `json:"service_type"`
- HostName string `json:"host_name"`
- Data []MetricData `json:"data"`
- }
- func main() {
- runtime.GOMAXPROCS(1)
- // 初始化配置文件(支持热加载)
- // 支持三种方式指定配置文件:
- // 1. 命令行参数 --config
- // 2. 环境变量 EUSPACE_CONFIG
- // 3. 自动查找默认位置的配置文件(如果存在)
- configPath := *flags.ConfigFile
- if configPath == "" {
- configPath = os.Getenv("EUSPACE_CONFIG")
- }
- var err error
- if err = config.InitConfig(configPath); err != nil {
- log.Warnf("Failed to initialize config: %v, using command line flags and environment variables", err)
- }
- // 使用 Config 管理器初始化日志系统(自动处理优先级)
- if err = config.InitLogFromConfig(); err != nil {
- log.WithError(err).Errorf("log init error.")
- }
- if err = rlimit.RemoveMemlock(); err != nil {
- log.WithError(err).Warning("Failed Removing memlock.")
- } else {
- log.Info("Rlimit removed")
- }
- //log.LogToStderr(false)
- //log.SetOutput(&RateLimitedLogOutput{limiter: rate.NewLimiter(rate.Limit(*flags.LogPerSecond), *flags.LogBurst)})
- log.Infoln("agent version:", version)
- if *flags.RunInContainer {
- _, err = kube.NewKubeClient()
- if err != nil {
- log.WithError(err).Errorf("Failed to init kube client.")
- }
- }
- hostname, kv, err := uname()
- if err != nil {
- log.Fatalln("failed to get uname:", err)
- }
- log.Infoln("hostname:", hostname)
- log.Infoln("kernel version:", kv)
- // 构建节点信息
- nodeInfo, err := node.NewNodeInfo(hostname, kv, version)
- if err != nil || nodeInfo == nil {
- log.Fatalln(err)
- }
- log.Infof("node info %s", utils.ToString(nodeInfo))
- ver := common.KernelMajorMinor(kv)
- if ver == "" {
- log.Fatalln("invalid kernel version:", kv)
- }
- if semver.Compare("v"+ver, "v"+minSupportedKernelVersion) == -1 {
- log.Fatalf("the minimum Linux kernel version required is %s or later", minSupportedKernelVersion)
- }
- whitelistNodeExternalNetworks()
- SystemUUID := nodeInfo.GetNodeInfo().SystemUUID
- tracing.Init(SystemUUID, hostname, version)
- logs.Init(SystemUUID, hostname, version)
- registry := prometheus.NewRegistry()
- registerer := prometheus.WrapRegistererWith(prometheus.Labels{"system_uuid": SystemUUID}, registry)
- registerer.MustRegister(info("node_agent_info", version))
- if err := registerer.Register(node.NewCollector(hostname, kv)); err != nil {
- log.Fatalln(err)
- }
- //processInfoCh := profiling.Init(machineId, hostname)
- cr, err := containers.NewRegistry(registerer, kv, nodeInfo, nil)
- if err != nil {
- log.Fatalln(err)
- }
- //defer cr.Close()
- log.Infoln("START_TRACE")
- if *flags.RunInOmniagent {
- //namedpipe初始化
- npCtl, err := namedpipe.NewNamedPipeCtl(nil)
- if err != nil {
- log.Errorf("get namedpipeCtl occurs error: %s", err.Error())
- } else {
- //监听&处理-熔断信号
- npCtl.AcceptAndDisposeMsg(cr)
- }
- }
- //heartbeat
- try.GoParams(containers.DoHeartbeat, utils.CatchFn, filepath.Join(utils.GetRootPath(), "heartbeat"))
- //profiling.Start()
- //defer profiling.Stop()
- // 创建一个/metrics路由处理函数
- sendNetDataDone := make(chan struct{})
- if *flags.SendNetData {
- sendNetDataFunc := func() {
- // 从注册表中获取指标数据
- metrics, err := registry.Gather()
- if err != nil {
- // 错误处理
- return
- }
- // 创建正则表达式对象
- // regex, err := regexp.Compile(`^process_.+_queries_total$`)
- // if err != nil {
- // return
- // }
- var postData PostData
- postData.AccountID = strconv.Itoa(nodeInfo.AccountID)
- postData.IP = nodeInfo.HostIp
- postData.HostID = nodeInfo.HostID
- postData.TimeStamp = uint64(time.Now().UnixNano())
- postData.ServiceType = 30002
- postData.HostName = nodeInfo.Hostname
- for _, metric := range metrics {
- if metric.GetName() != "process_net_tcp_successful_connects_total" &&
- metric.GetName() != "process_net_tcp_failed_connects_total" &&
- metric.GetName() != "process_net_tcp_retransmits_total" &&
- metric.GetName() != "process_net_tcp_listen_info" &&
- metric.GetName() != "process_http_requests_total" &&
- metric.GetName() != "process_http_requests_duration_seconds_total" &&
- metric.GetName() != "process_http_requests_duration_seconds_total_count" &&
- metric.GetName() != "process_mysql_queries_total" &&
- metric.GetName() != "process_mysql_queries_duration_seconds_total" &&
- // metric.GetName() != "process_mysql_queries_duration_seconds_total_count" &&
- metric.GetName() != "process_redis_queries_total" &&
- metric.GetName() != "process_redis_queries_duration_seconds_total" &&
- // metric.GetName() != "process_redis_queries_duration_seconds_total_count" &&
- metric.GetName() != "process_postgres_queries_total" &&
- metric.GetName() != "process_postgres_queries_duration_seconds_total" &&
- // metric.GetName() != "process_postgres_queries_duration_seconds_total_count" &&
- // regex.MatchString(metric.GetName()) == false &&
- metric.GetName() != "process_dm_queries_total" &&
- metric.GetName() != "process_dm_queries_duration_seconds_total" &&
- metric.GetName() != "process_application_type" &&
- metric.GetName() != "process_net_tcp_bytes_received_per" &&
- metric.GetName() != "process_net_tcp_bytes_sent_per" &&
- metric.GetName() != "process_net_tcp_bytes_received_total" &&
- metric.GetName() != "process_net_tcp_bytes_sent_total" &&
- metric.GetName() != "process_net_tcp_data_latency_time" &&
- metric.GetName() != "process_net_tcp_flow_duration_time" &&
- metric.GetName() != "process_net_tcp_connection_establish_time" {
- continue
- }
- var item MetricData
- var itemOther MetricData
- item.MetricKey = metric.GetName()
- for _, m := range metric.GetMetric() {
- metricItem := MetricItemData{}
- label := make(map[string]string)
- for _, l := range m.GetLabel() {
- label[l.GetName()] = l.GetValue()
- }
- metricItem.Label = label
- switch metric.GetType() {
- case dto.MetricType_COUNTER:
- metricItem.Value = m.GetCounter().GetValue()
- item.Metric = append(item.Metric, metricItem)
- case dto.MetricType_GAUGE:
- metricItem.Value = m.GetGauge().GetValue()
- item.Metric = append(item.Metric, metricItem)
- case dto.MetricType_HISTOGRAM:
- item.MetricKey = metric.GetName() + "_sum"
- metricItem.Value = m.GetHistogram().GetSampleSum()
- item.Metric = append(item.Metric, metricItem)
- metricItemOther := MetricItemData{}
- metricItemOther.Label = label
- itemOther.MetricKey = metric.GetName() + "_count"
- metricItemOther.Value = m.GetHistogram().GetSampleCount()
- itemOther.Metric = append(itemOther.Metric, metricItemOther)
- default:
- continue
- }
- }
- postData.Data = append(postData.Data, item)
- if metric.GetType() == dto.MetricType_HISTOGRAM {
- postData.Data = append(postData.Data, itemOther)
- }
- }
- // 将指标数据转换为JSON格式
- jsonData, err := json.Marshal(postData)
- //jsonData, err := json.Marshal(metrics)
- if err != nil {
- return
- }
- // log.Debugln("netdata is:", string(jsonData))
- // 创建请求
- urlRoute := "/api/v2/ebpf/receive"
- log.Infoln("send url is ", *flags.DataServer+*flags.ServerPrefix+urlRoute)
- // req, err := http.NewRequest("POST", "http://10.0.7.115:18080/api/v2/ebpf/receive", bytes.NewBuffer(jsonData))
- req, err := http.NewRequest("POST", "http://"+*flags.DataServer+*flags.ServerPrefix+urlRoute, bytes.NewBuffer(jsonData))
- if err != nil {
- log.Errorf(err.Error())
- return
- }
- // 添加 Content-Type header
- req.Header.Add("Content-Type", "application/json")
- // 添加一个自定义 header
- req.Header.Add("DataCount", strconv.Itoa(len(postData.Data)))
- req.Header.Add("Account-Id", strconv.Itoa(nodeInfo.AccountID))
- req.Header.Add("ip", nodeInfo.HostIp)
- // 创建 HTTP 客户端
- client := &http.Client{}
- // 发送 HTTP POST 请求
- response, err := client.Do(req)
- if err != nil {
- log.Errorf(err.Error())
- return
- }
- defer response.Body.Close()
- // 读取响应内容
- responseData, err := io.ReadAll(response.Body)
- if err != nil {
- log.Errorf("Error:", err)
- return
- }
- // 输出响应状态码和响应正文
- log.Infoln("Status Code:", response.StatusCode)
- log.Infoln("Response Body:", string(responseData))
- }
- try.Go(func() {
- sendNetDataTicker := time.NewTicker(sendNetDataInterval)
- defer sendNetDataTicker.Stop()
- for {
- select {
- case <-sendNetDataDone:
- return
- case _ = <-sendNetDataTicker.C:
- log.Infoln("before enter sendNetDataFunc")
- if !cr.IsFusing() {
- log.Infoln("after enter sendNetDataFunc")
- sendNetDataFunc()
- }
- }
- }
- }, utils.CatchFn)
- }
- /* metricsHandler := func(w http.ResponseWriter, r *http.Request) {
- // 从注册表中获取指标数据
- metrics, err := registry.Gather()
- if err != nil {
- // 错误处理
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- // 创建正则表达式对象
- // regex, err := regexp.Compile(`^process_.+_queries_total$`)
- // if err != nil {
- // return
- // }
- var postData PostData
- postData.AccountID = strconv.Itoa(nodeInfo.AccountID)
- postData.IP = nodeInfo.HostIp
- postData.HostID = nodeInfo.HostID
- postData.TimeStamp = uint64(time.Now().UnixNano())
- postData.ServiceType = 30002
- postData.HostName = nodeInfo.Hostname
- for _, metric := range metrics {
- if metric.GetName() != "process_net_tcp_successful_connects_total" &&
- metric.GetName() != "process_net_tcp_failed_connects_total" &&
- metric.GetName() != "process_net_tcp_retransmits_total" &&
- metric.GetName() != "process_net_tcp_listen_info" &&
- metric.GetName() != "process_http_requests_total" &&
- metric.GetName() != "process_http_requests_duration_seconds_total" &&
- metric.GetName() != "process_http_requests_duration_seconds_total_count" &&
- metric.GetName() != "process_mysql_queries_total" &&
- metric.GetName() != "process_mysql_queries_duration_seconds_total" &&
- // metric.GetName() != "process_mysql_queries_duration_seconds_total_count" &&
- metric.GetName() != "process_redis_queries_total" &&
- metric.GetName() != "process_redis_queries_duration_seconds_total" &&
- // metric.GetName() != "process_redis_queries_duration_seconds_total_count" &&
- metric.GetName() != "process_postgres_queries_total" &&
- metric.GetName() != "process_postgres_queries_duration_seconds_total" &&
- // metric.GetName() != "process_postgres_queries_duration_seconds_total_count" &&
- // regex.MatchString(metric.GetName()) == false &&
- metric.GetName() != "process_dm_queries_total" &&
- metric.GetName() != "process_dm_queries_duration_seconds_total" &&
- metric.GetName() != "process_application_type" &&
- metric.GetName() != "process_net_tcp_bytes_received_per" &&
- metric.GetName() != "process_net_tcp_bytes_sent_per" &&
- metric.GetName() != "process_net_tcp_bytes_received_total" &&
- metric.GetName() != "process_net_tcp_bytes_sent_total" &&
- metric.GetName() != "process_net_tcp_data_latency_time" &&
- metric.GetName() != "process_net_tcp_flow_duration_time" &&
- metric.GetName() != "process_net_tcp_connection_establish_time" {
- continue
- }
- var item MetricData
- var itemOther MetricData
- item.MetricKey = metric.GetName()
- for _, m := range metric.GetMetric() {
- metricItem := MetricItemData{}
- label := make(map[string]string)
- for _, l := range m.GetLabel() {
- label[l.GetName()] = l.GetValue()
- }
- metricItem.Label = label
- switch metric.GetType() {
- case dto.MetricType_COUNTER:
- metricItem.Value = m.GetCounter().GetValue()
- item.Metric = append(item.Metric, metricItem)
- case dto.MetricType_GAUGE:
- metricItem.Value = m.GetGauge().GetValue()
- item.Metric = append(item.Metric, metricItem)
- case dto.MetricType_HISTOGRAM:
- item.MetricKey = metric.GetName() + "_sum"
- metricItem.Value = m.GetHistogram().GetSampleSum()
- item.Metric = append(item.Metric, metricItem)
- metricItemOther := MetricItemData{}
- metricItemOther.Label = label
- itemOther.MetricKey = metric.GetName() + "_count"
- metricItemOther.Value = m.GetHistogram().GetSampleCount()
- itemOther.Metric = append(itemOther.Metric, metricItemOther)
- default:
- continue
- }
- }
- postData.Data = append(postData.Data, item)
- if metric.GetType() == dto.MetricType_HISTOGRAM {
- postData.Data = append(postData.Data, itemOther)
- }
- }
- // 将指标数据转换为JSON格式
- jsonData, err := json.Marshal(postData)
- //jsonData, err := json.Marshal(metrics)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- w.Header().Set("Content-Type", "application/json")
- w.Write(jsonData)
- // 创建请求
- req, err := http.NewRequest("POST", "http://10.0.7.115:18080/api/v2/ebpf/receive", bytes.NewBuffer(jsonData))
- if err != nil {
- log.Errorf("Error:", err)
- return
- }
- // 添加 Content-Type header
- req.Header.Add("Content-Type", "application/json")
- // 添加一个自定义 header
- req.Header.Add("DataCount", strconv.Itoa(len(postData.Data)))
- req.Header.Add("Account-Id", strconv.Itoa(nodeInfo.AccountID))
- req.Header.Add("ip", nodeInfo.HostIp)
- // 创建 HTTP 客户端
- client := &http.Client{}
- // 发送 HTTP POST 请求
- response, err := client.Do(req)
- if err != nil {
- log.Errorf("Error:", err)
- return
- }
- defer response.Body.Close()
- // 读取响应内容
- responseData, err := io.ReadAll(response.Body)
- if err != nil {
- log.Errorf("Error:", err)
- return
- }
- // 输出响应状态码和响应正文
- log.Debugln("Status Code:", response.StatusCode)
- log.Infoln("Response Body:", string(responseData))
- }*/
- /*if err := prom.StartAgent(SystemUUID); err != nil {
- log.Fatalln(err)
- }
- http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorLog: logger{}, Registry: registerer}))
- http.HandleFunc("/metrics2", metricsHandler)
- log.Infoln("listening on:", *flags.ListenAddress)
- log.Errorln(http.ListenAndServe(*flags.ListenAddress, nil))*/
- sigs := make(chan os.Signal, 1)
- signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
- done := make(chan struct{})
- try.Go(func() {
- <-sigs
- log.Infoln("Signal received, shutting down...")
- cr.Close()
- close(sendNetDataDone)
- close(done)
- }, utils.CatchFn)
- select {
- case <-done:
- log.Infoln(flags.AgentName + " exited successfully.")
- }
- }
- func info(name, version string) prometheus.Collector {
- g := prometheus.NewGauge(prometheus.GaugeOpts{
- Name: name,
- ConstLabels: prometheus.Labels{"version": version},
- })
- g.Set(1)
- return g
- }
- type logger struct{}
- func (l logger) Println(v ...interface{}) {
- log.Errorln(v...)
- }
- type RateLimitedLogOutput struct {
- limiter *rate.Limiter
- }
- func (o *RateLimitedLogOutput) Write(data []byte) (int, error) {
- if !o.limiter.Allow() {
- return len(data), nil
- }
- return os.Stderr.Write(data)
- }
|