package main import ( "bytes" "encoding/json" "github.com/cilium/ebpf/rlimit" "github.com/coroot/coroot-node-agent/kube" "github.com/coroot/coroot-node-agent/utils" "github.com/coroot/coroot-node-agent/utils/enums" "github.com/coroot/coroot-node-agent/utils/namedpipe" "github.com/coroot/coroot-node-agent/utils/try" dto "github.com/prometheus/client_model/go" log "github.com/sirupsen/logrus" "io" "net/http" _ "net/http/pprof" "os" "os/signal" "path" "path/filepath" "syscall" // "regexp" "runtime" "strconv" "strings" "time" "github.com/coroot/coroot-node-agent/common" "github.com/coroot/coroot-node-agent/containers" "github.com/coroot/coroot-node-agent/flags" "github.com/coroot/coroot-node-agent/logs" "github.com/coroot/coroot-node-agent/node" "github.com/coroot/coroot-node-agent/tracing" "github.com/prometheus/client_golang/prometheus" "golang.org/x/mod/semver" "golang.org/x/sys/unix" "golang.org/x/time/rate" ) var ( version = flags.AgentVersion sendNetDataInterval = 1 * time.Minute ) const minSupportedKernelVersion = "4.18" func init() { logs.FormatterInit() } func uname() (string, string, error) { runtime.LockOSThread() defer runtime.UnlockOSThread() f, err := os.Open("/proc/1/ns/uts") if err != nil { return "", "", err } defer f.Close() self, err := os.Open("/proc/self/ns/uts") if err != nil { return "", "", err } defer self.Close() defer func() { unix.Setns(int(self.Fd()), unix.CLONE_NEWUTS) }() err = unix.Setns(int(f.Fd()), unix.CLONE_NEWUTS) if err != nil { return "", "", err } var utsname unix.Utsname if err := unix.Uname(&utsname); err != nil { return "", "", err } hostname := string(bytes.Split(utsname.Nodename[:], []byte{0})[0]) kernelVersion := string(bytes.Split(utsname.Release[:], []byte{0})[0]) return hostname, kernelVersion, nil } func machineID() string { for _, p := range []string{"sys/devices/virtual/dmi/id/product_uuid", "etc/machine-id", "var/lib/dbus/machine-id"} { payload, err := os.ReadFile(path.Join("/proc/1/root", p)) if err != nil { log.Warningln("failed to read machine-id:", err) continue } id := strings.TrimSpace(strings.Replace(string(payload), "-", "", -1)) log.Infoln("machine-id: ", id) return id } return "" } func whitelistNodeExternalNetworks() { netdevs, err := node.NetDevices() if err != nil { log.Warningln("failed to get network interfaces:", err) return } for _, iface := range netdevs { for _, p := range iface.IPPrefixes { if p.IP().IsLoopback() || common.IsIpPrivate(p.IP()) { continue } // if the node has an external network IP, whitelist that network common.ConnectionFilter.WhitelistPrefix(p) } } } type MetricItemData struct { Label map[string]string `json:"metric_tags"` Value any `json:"value"` } type MetricData struct { MetricKey string `json:"metric_key"` Metric []MetricItemData `json:"metric"` } type PostData struct { AccountID string `json:"accountId"` IP string `json:"ip"` HostID int64 `json:"hostId"` TimeStamp uint64 `json:"time_stamp"` ServiceType uint64 `json:"service_type"` HostName string `json:"host_name"` Data []MetricData `json:"data"` } func main() { runtime.GOMAXPROCS(1) err := logs.InitLog(*flags.LogLevel, logs.LogConfig{ Path: utils.GetDefaultLogPath(), AppInfo: enums.DaemonProc, MaxSize: 50, // 日志文件最大尺寸,单位MB MaxBackups: 3, // 最多保留的旧日志文件数 MaxAge: 3, // 日志文件保留的最长时间,单位天 Console: true, }) if err != nil { log.WithError(err).Errorf("log init error.") } if err := rlimit.RemoveMemlock(); err != nil { log.WithError(err).Warning("Failed Removing memlock.") } else { log.Info("Rlimit removed") } //log.LogToStderr(false) //log.SetOutput(&RateLimitedLogOutput{limiter: rate.NewLimiter(rate.Limit(*flags.LogPerSecond), *flags.LogBurst)}) log.Infoln("agent version:", version) hostname, kv, err := uname() if err != nil { log.Fatalln("failed to get uname:", err) } log.Infoln("hostname:", hostname) log.Infoln("kernel version:", kv) // 构建节点信息 nodeInfo, err := node.NewNodeInfo(hostname, kv, version) if err != nil || nodeInfo == nil { log.Fatalln(err) } log.Infof("node info %s", utils.ToString(nodeInfo)) ver := common.KernelMajorMinor(kv) if ver == "" { log.Fatalln("invalid kernel version:", kv) } if semver.Compare("v"+ver, "v"+minSupportedKernelVersion) == -1 { log.Fatalf("the minimum Linux kernel version required is %s or later", minSupportedKernelVersion) } whitelistNodeExternalNetworks() SystemUUID := nodeInfo.GetNodeInfo().SystemUUID tracing.Init(SystemUUID, hostname, version) logs.Init(SystemUUID, hostname, version) if *flags.RunInContainer { _, err = kube.NewKubeClient() if err != nil { log.WithError(err).Errorf("Failed to init kube client.") } } registry := prometheus.NewRegistry() registerer := prometheus.WrapRegistererWith(prometheus.Labels{"system_uuid": SystemUUID}, registry) registerer.MustRegister(info("node_agent_info", version)) if err := registerer.Register(node.NewCollector(hostname, kv)); err != nil { log.Fatalln(err) } //processInfoCh := profiling.Init(machineId, hostname) cr, err := containers.NewRegistry(registerer, kv, nodeInfo, nil) if err != nil { log.Fatalln(err) } //defer cr.Close() log.Infoln("START_TRACE") if *flags.RunInOmniagent { //namedpipe初始化 npCtl, err := namedpipe.NewNamedPipeCtl(nil) if err != nil { log.Errorf("get namedpipeCtl occurs error: %s", err.Error()) } else { //监听&处理-熔断信号 npCtl.AcceptAndDisposeMsg(cr) } } //heartbeat try.GoParams(containers.DoHeartbeat, utils.CatchFn, filepath.Join(utils.GetRootPath(), "heartbeat")) //profiling.Start() //defer profiling.Stop() // 创建一个/metrics路由处理函数 sendNetDataFunc := func() { // 从注册表中获取指标数据 metrics, err := registry.Gather() if err != nil { // 错误处理 return } // 创建正则表达式对象 // regex, err := regexp.Compile(`^process_.+_queries_total$`) // if err != nil { // return // } var postData PostData postData.AccountID = strconv.Itoa(nodeInfo.AccountID) postData.IP = nodeInfo.HostIp postData.HostID = nodeInfo.HostID postData.TimeStamp = uint64(time.Now().UnixNano()) postData.ServiceType = 30002 postData.HostName = nodeInfo.Hostname for _, metric := range metrics { if metric.GetName() != "process_net_tcp_successful_connects_total" && metric.GetName() != "process_net_tcp_failed_connects_total" && metric.GetName() != "process_net_tcp_retransmits_total" && metric.GetName() != "process_net_tcp_listen_info" && metric.GetName() != "process_http_requests_total" && metric.GetName() != "process_http_requests_duration_seconds_total" && metric.GetName() != "process_http_requests_duration_seconds_total_count" && metric.GetName() != "process_mysql_queries_total" && metric.GetName() != "process_mysql_queries_duration_seconds_total" && // metric.GetName() != "process_mysql_queries_duration_seconds_total_count" && metric.GetName() != "process_redis_queries_total" && metric.GetName() != "process_redis_queries_duration_seconds_total" && // metric.GetName() != "process_redis_queries_duration_seconds_total_count" && metric.GetName() != "process_postgres_queries_total" && metric.GetName() != "process_postgres_queries_duration_seconds_total" && // metric.GetName() != "process_postgres_queries_duration_seconds_total_count" && // regex.MatchString(metric.GetName()) == false && metric.GetName() != "process_dm_queries_total" && metric.GetName() != "process_dm_queries_duration_seconds_total" && metric.GetName() != "process_application_type" && metric.GetName() != "process_net_tcp_bytes_received_per" && metric.GetName() != "process_net_tcp_bytes_sent_per" && metric.GetName() != "process_net_tcp_bytes_received_total" && metric.GetName() != "process_net_tcp_bytes_sent_total" && metric.GetName() != "process_net_tcp_data_latency_time" && metric.GetName() != "process_net_tcp_flow_duration_time" && metric.GetName() != "process_net_tcp_connection_establish_time" { continue } var item MetricData var itemOther MetricData item.MetricKey = metric.GetName() for _, m := range metric.GetMetric() { metricItem := MetricItemData{} label := make(map[string]string) for _, l := range m.GetLabel() { label[l.GetName()] = l.GetValue() } metricItem.Label = label switch metric.GetType() { case dto.MetricType_COUNTER: metricItem.Value = m.GetCounter().GetValue() item.Metric = append(item.Metric, metricItem) case dto.MetricType_GAUGE: metricItem.Value = m.GetGauge().GetValue() item.Metric = append(item.Metric, metricItem) case dto.MetricType_HISTOGRAM: item.MetricKey = metric.GetName() + "_sum" metricItem.Value = m.GetHistogram().GetSampleSum() item.Metric = append(item.Metric, metricItem) metricItemOther := MetricItemData{} metricItemOther.Label = label itemOther.MetricKey = metric.GetName() + "_count" metricItemOther.Value = m.GetHistogram().GetSampleCount() itemOther.Metric = append(itemOther.Metric, metricItemOther) default: continue } } postData.Data = append(postData.Data, item) if metric.GetType() == dto.MetricType_HISTOGRAM { postData.Data = append(postData.Data, itemOther) } } // 将指标数据转换为JSON格式 jsonData, err := json.Marshal(postData) //jsonData, err := json.Marshal(metrics) if err != nil { return } log.Infoln("netdata is:", string(jsonData)) // 创建请求 urlRoute := "/api/v2/ebpf/receive" log.Infoln("send url is ", *flags.DataServer+*flags.ServerPrefix+urlRoute) // req, err := http.NewRequest("POST", "http://10.0.7.115:18080/api/v2/ebpf/receive", bytes.NewBuffer(jsonData)) req, err := http.NewRequest("POST", *flags.DataServer+*flags.ServerPrefix+urlRoute, bytes.NewBuffer(jsonData)) if err != nil { log.Errorf(err.Error()) return } // 添加 Content-Type header req.Header.Add("Content-Type", "application/json") // 添加一个自定义 header req.Header.Add("DataCount", strconv.Itoa(len(postData.Data))) req.Header.Add("Account-Id", strconv.Itoa(nodeInfo.AccountID)) req.Header.Add("ip", nodeInfo.HostIp) // 创建 HTTP 客户端 client := &http.Client{} // 发送 HTTP POST 请求 response, err := client.Do(req) if err != nil { log.Errorf(err.Error()) return } defer response.Body.Close() // 读取响应内容 responseData, err := io.ReadAll(response.Body) if err != nil { log.Infoln("Error:", err) return } // 输出响应状态码和响应正文 log.Infoln("Status Code:", response.StatusCode) log.Infoln("Response Body:", string(responseData)) } sendNetDataDone := make(chan struct{}) go func() { sendNetDataTicker := time.NewTicker(sendNetDataInterval) defer sendNetDataTicker.Stop() for { select { case <-sendNetDataDone: return case _ = <-sendNetDataTicker.C: if !cr.IsFusing() { sendNetDataFunc() } } } }() /* metricsHandler := func(w http.ResponseWriter, r *http.Request) { // 从注册表中获取指标数据 metrics, err := registry.Gather() if err != nil { // 错误处理 http.Error(w, err.Error(), http.StatusInternalServerError) return } // 创建正则表达式对象 // regex, err := regexp.Compile(`^process_.+_queries_total$`) // if err != nil { // return // } var postData PostData postData.AccountID = strconv.Itoa(nodeInfo.AccountID) postData.IP = nodeInfo.HostIp postData.HostID = nodeInfo.HostID postData.TimeStamp = uint64(time.Now().UnixNano()) postData.ServiceType = 30002 postData.HostName = nodeInfo.Hostname for _, metric := range metrics { if metric.GetName() != "process_net_tcp_successful_connects_total" && metric.GetName() != "process_net_tcp_failed_connects_total" && metric.GetName() != "process_net_tcp_retransmits_total" && metric.GetName() != "process_net_tcp_listen_info" && metric.GetName() != "process_http_requests_total" && metric.GetName() != "process_http_requests_duration_seconds_total" && metric.GetName() != "process_http_requests_duration_seconds_total_count" && metric.GetName() != "process_mysql_queries_total" && metric.GetName() != "process_mysql_queries_duration_seconds_total" && // metric.GetName() != "process_mysql_queries_duration_seconds_total_count" && metric.GetName() != "process_redis_queries_total" && metric.GetName() != "process_redis_queries_duration_seconds_total" && // metric.GetName() != "process_redis_queries_duration_seconds_total_count" && metric.GetName() != "process_postgres_queries_total" && metric.GetName() != "process_postgres_queries_duration_seconds_total" && // metric.GetName() != "process_postgres_queries_duration_seconds_total_count" && // regex.MatchString(metric.GetName()) == false && metric.GetName() != "process_dm_queries_total" && metric.GetName() != "process_dm_queries_duration_seconds_total" && metric.GetName() != "process_application_type" && metric.GetName() != "process_net_tcp_bytes_received_per" && metric.GetName() != "process_net_tcp_bytes_sent_per" && metric.GetName() != "process_net_tcp_bytes_received_total" && metric.GetName() != "process_net_tcp_bytes_sent_total" && metric.GetName() != "process_net_tcp_data_latency_time" && metric.GetName() != "process_net_tcp_flow_duration_time" && metric.GetName() != "process_net_tcp_connection_establish_time" { continue } var item MetricData var itemOther MetricData item.MetricKey = metric.GetName() for _, m := range metric.GetMetric() { metricItem := MetricItemData{} label := make(map[string]string) for _, l := range m.GetLabel() { label[l.GetName()] = l.GetValue() } metricItem.Label = label switch metric.GetType() { case dto.MetricType_COUNTER: metricItem.Value = m.GetCounter().GetValue() item.Metric = append(item.Metric, metricItem) case dto.MetricType_GAUGE: metricItem.Value = m.GetGauge().GetValue() item.Metric = append(item.Metric, metricItem) case dto.MetricType_HISTOGRAM: item.MetricKey = metric.GetName() + "_sum" metricItem.Value = m.GetHistogram().GetSampleSum() item.Metric = append(item.Metric, metricItem) metricItemOther := MetricItemData{} metricItemOther.Label = label itemOther.MetricKey = metric.GetName() + "_count" metricItemOther.Value = m.GetHistogram().GetSampleCount() itemOther.Metric = append(itemOther.Metric, metricItemOther) default: continue } } postData.Data = append(postData.Data, item) if metric.GetType() == dto.MetricType_HISTOGRAM { postData.Data = append(postData.Data, itemOther) } } // 将指标数据转换为JSON格式 jsonData, err := json.Marshal(postData) //jsonData, err := json.Marshal(metrics) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(jsonData) // 创建请求 req, err := http.NewRequest("POST", "http://10.0.7.115:18080/api/v2/ebpf/receive", bytes.NewBuffer(jsonData)) if err != nil { log.Errorf("Error:", err) return } // 添加 Content-Type header req.Header.Add("Content-Type", "application/json") // 添加一个自定义 header req.Header.Add("DataCount", strconv.Itoa(len(postData.Data))) req.Header.Add("Account-Id", strconv.Itoa(nodeInfo.AccountID)) req.Header.Add("ip", nodeInfo.HostIp) // 创建 HTTP 客户端 client := &http.Client{} // 发送 HTTP POST 请求 response, err := client.Do(req) if err != nil { log.Errorf("Error:", err) return } defer response.Body.Close() // 读取响应内容 responseData, err := io.ReadAll(response.Body) if err != nil { log.Errorf("Error:", err) return } // 输出响应状态码和响应正文 log.Debugln("Status Code:", response.StatusCode) log.Infoln("Response Body:", string(responseData)) }*/ /*if err := prom.StartAgent(SystemUUID); err != nil { log.Fatalln(err) } http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorLog: logger{}, Registry: registerer})) http.HandleFunc("/metrics2", metricsHandler) log.Infoln("listening on:", *flags.ListenAddress) log.Errorln(http.ListenAndServe(*flags.ListenAddress, nil))*/ sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) done := make(chan struct{}) go func() { <-sigs log.Infoln("Signal received, shutting down...") cr.Close() close(sendNetDataDone) close(done) }() select { case <-done: log.Infoln(flags.AgentName + " exited successfully.") } } func info(name, version string) prometheus.Collector { g := prometheus.NewGauge(prometheus.GaugeOpts{ Name: name, ConstLabels: prometheus.Labels{"version": version}, }) g.Set(1) return g } type logger struct{} func (l logger) Println(v ...interface{}) { log.Errorln(v...) } type RateLimitedLogOutput struct { limiter *rate.Limiter } func (o *RateLimitedLogOutput) Write(data []byte) (int, error) { if !o.limiter.Allow() { return len(data), nil } return os.Stderr.Write(data) }