package main import ( "bytes" "fmt" "net/http" _ "net/http/pprof" "os" "path" "runtime" "strings" "encoding/json" "github.com/coroot/coroot-node-agent/common" "github.com/coroot/coroot-node-agent/config" "github.com/coroot/coroot-node-agent/containers" "github.com/coroot/coroot-node-agent/flags" "github.com/coroot/coroot-node-agent/logs" "github.com/coroot/coroot-node-agent/node" "github.com/coroot/coroot-node-agent/prom" "github.com/coroot/coroot-node-agent/tracing" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" dto "github.com/prometheus/client_model/go" "golang.org/x/mod/semver" "golang.org/x/sys/unix" "golang.org/x/time/rate" "k8s.io/klog/v2" ) var ( version = "unknown" ) const minSupportedKernelVersion = "4.16" func uname() (string, string, error) { runtime.LockOSThread() defer runtime.UnlockOSThread() f, err := os.Open("/proc/1/ns/uts") if err != nil { return "", "", err } defer f.Close() self, err := os.Open("/proc/self/ns/uts") if err != nil { return "", "", err } defer self.Close() defer func() { unix.Setns(int(self.Fd()), unix.CLONE_NEWUTS) }() err = unix.Setns(int(f.Fd()), unix.CLONE_NEWUTS) if err != nil { return "", "", err } var utsname unix.Utsname if err := unix.Uname(&utsname); err != nil { return "", "", err } hostname := string(bytes.Split(utsname.Nodename[:], []byte{0})[0]) kernelVersion := string(bytes.Split(utsname.Release[:], []byte{0})[0]) return hostname, kernelVersion, nil } func machineID() string { for _, p := range []string{"sys/devices/virtual/dmi/id/product_uuid", "etc/machine-id", "var/lib/dbus/machine-id"} { payload, err := os.ReadFile(path.Join("/proc/1/root", p)) if err != nil { klog.Warningln("failed to read machine-id:", err) continue } id := strings.TrimSpace(strings.Replace(string(payload), "-", "", -1)) klog.Infoln("machine-id: ", id) return id } return "" } func whitelistNodeExternalNetworks() { netdevs, err := node.NetDevices() if err != nil { klog.Warningln("failed to get network interfaces:", err) return } for _, iface := range netdevs { for _, p := range iface.IPPrefixes { if p.IP().IsLoopback() || common.IsIpPrivate(p.IP()) { continue } // if the node has an external network IP, whitelist that network // IK: 如果节点具有外部网络IP,则将该网络列入白名单... common.ConnectionFilter.WhitelistPrefix(p) } } } type MetricItemData struct { Label map[string]string `json:"metric_tags"` Value any `json:"value"` } type MetricData struct { MetricKey string `json:"metric_key"` Metric []MetricItemData `json:"metric"` } func main() { // IK: 增加配置文件 agentConfig, err := config.LoadConfig() if err != nil { fmt.Println(err) return } fmt.Printf("License=%+v\r\n", agentConfig) // IK: 限制资源 maxProcs := agentConfig.ResourceConfig.ResourceMaxProcs if maxProcs <= 0 { maxProcs = 1 } runtime.GOMAXPROCS(maxProcs) // IK: 初始化日志组件, klog是将日志以http方式发送到server端, 支持配置LogPerSecond等配置. klog.LogToStderr(false) klog.SetOutput(&RateLimitedLogOutput{limiter: rate.NewLimiter(rate.Limit(*flags.LogPerSecond), *flags.LogBurst)}) klog.Infoln("agent version:", version) hostname, kv, err := uname() // IK: 获取主机名称,内核版本... if err != nil { klog.Exitln("failed to get uname:", err) } klog.Infoln("hostname:", hostname) klog.Infoln("kernel version:", kv) ver := common.KernelMajorMinor(kv) if ver == "" { klog.Exitln("invalid kernel version:", kv) } if semver.Compare("v"+ver, "v"+minSupportedKernelVersion) == -1 { klog.Exitf("the minimum Linux kernel version required is %s or later", minSupportedKernelVersion) } whitelistNodeExternalNetworks() // IK:TODO: 将外部网络放到白名单??? machineId := machineID() tracing.Init(machineId, hostname, version) // IK:TODO: 初始化发送??? logs.Init(machineId, hostname, version) registry := prometheus.NewRegistry() registerer := prometheus.WrapRegistererWith(prometheus.Labels{"machine_id": machineId}, registry) registerer.MustRegister(info("node_agent_info", version)) if err := registerer.Register(node.NewCollector(hostname, kv)); err != nil { klog.Exitln(err) } //processInfoCh := profiling.Init(machineId, hostname) cr, err := containers.NewRegistry(registerer, kv, agentConfig, nil) // IK: 核心代码 if err != nil { klog.Exitln(err) } defer cr.Close() klog.Infoln("START_TRACE") //profiling.Start() //defer profiling.Stop() // 创建一个/metrics路由处理函数 metricsHandler := func(w http.ResponseWriter, r *http.Request) { // 从注册表中获取指标数据 metrics, err := registry.Gather() if err != nil { // 错误处理 http.Error(w, err.Error(), http.StatusInternalServerError) return } var Data []MetricData for _, metric := range metrics { if metric.GetName() != "container_net_tcp_successful_connects_total" && metric.GetName() != "container_net_tcp_failed_connects_total" && metric.GetName() != "container_net_tcp_retransmits_total" && metric.GetName() != "container_net_tcp_listen_info" && metric.GetName() != "container_http_requests_total" && metric.GetName() != "container_http_requests_duration_seconds_total" && metric.GetName() != "container_application_type" { continue } var item MetricData var itemOther MetricData item.MetricKey = metric.GetName() for _, m := range metric.GetMetric() { metricItem := MetricItemData{} label := make(map[string]string) for _, l := range m.GetLabel() { label[l.GetName()] = l.GetValue() } metricItem.Label = label switch metric.GetType() { case dto.MetricType_COUNTER: metricItem.Value = m.GetCounter().GetValue() item.Metric = append(item.Metric, metricItem) case dto.MetricType_GAUGE: metricItem.Value = m.GetGauge().GetValue() item.Metric = append(item.Metric, metricItem) case dto.MetricType_HISTOGRAM: item.MetricKey = metric.GetName() + "_sum" metricItem.Value = m.GetHistogram().GetSampleSum() item.Metric = append(item.Metric, metricItem) metricItemOther := MetricItemData{} metricItemOther.Label = label itemOther.MetricKey = metric.GetName() + "_count" metricItemOther.Value = m.GetHistogram().GetSampleCount() itemOther.Metric = append(itemOther.Metric, metricItemOther) default: continue } } Data = append(Data, item) if metric.GetType() == dto.MetricType_HISTOGRAM { Data = append(Data, itemOther) } } // 将指标数据转换为JSON格式 jsonData, err := json.Marshal(Data) //jsonData, err := json.Marshal(metrics) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(jsonData) } if err := prom.StartAgent(machineId); err != nil { klog.Exitln(err) } http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorLog: logger{}, Registry: registerer})) http.HandleFunc("/metrics2", metricsHandler) klog.Infoln("listening on:", *flags.ListenAddress) klog.Errorln(http.ListenAndServe(*flags.ListenAddress, nil)) } func info(name, version string) prometheus.Collector { g := prometheus.NewGauge(prometheus.GaugeOpts{ Name: name, ConstLabels: prometheus.Labels{"version": version}, }) g.Set(1) return g } type logger struct{} func (l logger) Println(v ...interface{}) { klog.Errorln(v...) } type RateLimitedLogOutput struct { limiter *rate.Limiter } func (o *RateLimitedLogOutput) Write(data []byte) (int, error) { if !o.limiter.Allow() { return len(data), nil } return os.Stderr.Write(data) }