package main import ( "bytes" "github.com/coroot/coroot-node-agent/utils" "github.com/coroot/coroot-node-agent/utils/enums" log "github.com/sirupsen/logrus" "net/http" _ "net/http/pprof" "os" "path" "runtime" "strings" "encoding/json" dto "github.com/prometheus/client_model/go" "github.com/coroot/coroot-node-agent/common" "github.com/coroot/coroot-node-agent/containers" "github.com/coroot/coroot-node-agent/flags" "github.com/coroot/coroot-node-agent/logs" "github.com/coroot/coroot-node-agent/node" "github.com/coroot/coroot-node-agent/prom" "github.com/coroot/coroot-node-agent/tracing" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/mod/semver" "golang.org/x/sys/unix" "golang.org/x/time/rate" ) var ( version = "unknown" ) const minSupportedKernelVersion = "4.16" func init() { logs.FormatterInit() } func uname() (string, string, error) { runtime.LockOSThread() defer runtime.UnlockOSThread() f, err := os.Open("/proc/1/ns/uts") if err != nil { return "", "", err } defer f.Close() self, err := os.Open("/proc/self/ns/uts") if err != nil { return "", "", err } defer self.Close() defer func() { unix.Setns(int(self.Fd()), unix.CLONE_NEWUTS) }() err = unix.Setns(int(f.Fd()), unix.CLONE_NEWUTS) if err != nil { return "", "", err } var utsname unix.Utsname if err := unix.Uname(&utsname); err != nil { return "", "", err } hostname := string(bytes.Split(utsname.Nodename[:], []byte{0})[0]) kernelVersion := string(bytes.Split(utsname.Release[:], []byte{0})[0]) return hostname, kernelVersion, nil } func machineID() string { for _, p := range []string{"sys/devices/virtual/dmi/id/product_uuid", "etc/machine-id", "var/lib/dbus/machine-id"} { payload, err := os.ReadFile(path.Join("/proc/1/root", p)) if err != nil { log.Warningln("failed to read machine-id:", err) continue } id := strings.TrimSpace(strings.Replace(string(payload), "-", "", -1)) log.Infoln("machine-id: ", id) return id } return "" } func whitelistNodeExternalNetworks() { netdevs, err := node.NetDevices() if err != nil { log.Warningln("failed to get network interfaces:", err) return } for _, iface := range netdevs { for _, p := range iface.IPPrefixes { if p.IP().IsLoopback() || common.IsIpPrivate(p.IP()) { continue } // if the node has an external network IP, whitelist that network common.ConnectionFilter.WhitelistPrefix(p) } } } type MetricItemData struct { Label map[string]string `json:"metric_tags"` Value any `json:"value"` } type MetricData struct { MetricKey string `json:"metric_key"` Metric []MetricItemData `json:"metric"` } func main() { err := logs.InitLog(*flags.LogLevel, logs.LogConfig{ Path: utils.GetDefaultLogPath(), AppInfo: enums.DaemonProc, MaxSize: 50, // 日志文件最大尺寸,单位MB MaxBackups: 3, // 最多保留的旧日志文件数 MaxAge: 3, // 日志文件保留的最长时间,单位天 Console: true, }) if err != nil { log.WithError(err).Errorf("log init error.") } //log.LogToStderr(false) //log.SetOutput(&RateLimitedLogOutput{limiter: rate.NewLimiter(rate.Limit(*flags.LogPerSecond), *flags.LogBurst)}) log.Infoln("agent version:", version) hostname, kv, err := uname() if err != nil { log.Fatalln("failed to get uname:", err) } log.Infoln("hostname:", hostname) log.Infoln("kernel version:", kv) ver := common.KernelMajorMinor(kv) if ver == "" { log.Fatalln("invalid kernel version:", kv) } if semver.Compare("v"+ver, "v"+minSupportedKernelVersion) == -1 { log.Fatalf("the minimum Linux kernel version required is %s or later", minSupportedKernelVersion) } whitelistNodeExternalNetworks() machineId := utils.MachineID() tracing.Init(machineId, hostname, version) logs.Init(machineId, hostname, version) registry := prometheus.NewRegistry() registerer := prometheus.WrapRegistererWith(prometheus.Labels{"machine_id": machineId}, registry) registerer.MustRegister(info("node_agent_info", version)) if err := registerer.Register(node.NewCollector(hostname, kv)); err != nil { log.Fatalln(err) } //processInfoCh := profiling.Init(machineId, hostname) nodeInfo, err := node.NewNodeInfo(hostname, kv, machineId) if err != nil || nodeInfo == nil { log.Fatalln(err) } cr, err := containers.NewRegistry(registerer, kv, nodeInfo, nil) if err != nil { log.Fatalln(err) } defer cr.Close() log.Infoln("START_TRACE") //profiling.Start() //defer profiling.Stop() // 创建一个/metrics路由处理函数 metricsHandler := func(w http.ResponseWriter, r *http.Request) { // 从注册表中获取指标数据 metrics, err := registry.Gather() if err != nil { // 错误处理 http.Error(w, err.Error(), http.StatusInternalServerError) return } var Data []MetricData for _, metric := range metrics { if metric.GetName() != "container_net_tcp_successful_connects_total" && metric.GetName() != "container_net_tcp_failed_connects_total" && metric.GetName() != "container_net_tcp_retransmits_total" && metric.GetName() != "container_net_tcp_listen_info" && metric.GetName() != "container_http_requests_total" && metric.GetName() != "container_http_requests_duration_seconds_total" && metric.GetName() != "container_application_type" { continue } var item MetricData var itemOther MetricData item.MetricKey = metric.GetName() for _, m := range metric.GetMetric() { metricItem := MetricItemData{} label := make(map[string]string) for _, l := range m.GetLabel() { label[l.GetName()] = l.GetValue() } metricItem.Label = label switch metric.GetType() { case dto.MetricType_COUNTER: metricItem.Value = m.GetCounter().GetValue() item.Metric = append(item.Metric, metricItem) case dto.MetricType_GAUGE: metricItem.Value = m.GetGauge().GetValue() item.Metric = append(item.Metric, metricItem) case dto.MetricType_HISTOGRAM: item.MetricKey = metric.GetName() + "_sum" metricItem.Value = m.GetHistogram().GetSampleSum() item.Metric = append(item.Metric, metricItem) metricItemOther := MetricItemData{} metricItemOther.Label = label itemOther.MetricKey = metric.GetName() + "_count" metricItemOther.Value = m.GetHistogram().GetSampleCount() itemOther.Metric = append(itemOther.Metric, metricItemOther) default: continue } } Data = append(Data, item) if metric.GetType() == dto.MetricType_HISTOGRAM { Data = append(Data, itemOther) } } // 将指标数据转换为JSON格式 jsonData, err := json.Marshal(Data) //jsonData, err := json.Marshal(metrics) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(jsonData) } if err := prom.StartAgent(machineId); err != nil { log.Fatalln(err) } http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorLog: logger{}, Registry: registerer})) http.HandleFunc("/metrics2", metricsHandler) log.Infoln("listening on:", *flags.ListenAddress) log.Errorln(http.ListenAndServe(*flags.ListenAddress, nil)) } func info(name, version string) prometheus.Collector { g := prometheus.NewGauge(prometheus.GaugeOpts{ Name: name, ConstLabels: prometheus.Labels{"version": version}, }) g.Set(1) return g } type logger struct{} func (l logger) Println(v ...interface{}) { log.Errorln(v...) } type RateLimitedLogOutput struct { limiter *rate.Limiter } func (o *RateLimitedLogOutput) Write(data []byte) (int, error) { if !o.limiter.Allow() { return len(data), nil } return os.Stderr.Write(data) }