| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 |
- package main
- import (
- "bytes"
- "github.com/cilium/ebpf/rlimit"
- "github.com/coroot/coroot-node-agent/utils"
- "github.com/coroot/coroot-node-agent/utils/enums"
- log "github.com/sirupsen/logrus"
- "net/http"
- _ "net/http/pprof"
- "os"
- "path"
- "runtime"
- "strings"
- "encoding/json"
- dto "github.com/prometheus/client_model/go"
- "github.com/coroot/coroot-node-agent/common"
- "github.com/coroot/coroot-node-agent/containers"
- "github.com/coroot/coroot-node-agent/flags"
- "github.com/coroot/coroot-node-agent/logs"
- "github.com/coroot/coroot-node-agent/node"
- "github.com/coroot/coroot-node-agent/prom"
- "github.com/coroot/coroot-node-agent/tracing"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promhttp"
- "golang.org/x/mod/semver"
- "golang.org/x/sys/unix"
- "golang.org/x/time/rate"
- )
- var (
- version = "unknown"
- )
- const minSupportedKernelVersion = "4.18"
- func init() {
- logs.FormatterInit()
- }
- func uname() (string, string, error) {
- runtime.LockOSThread()
- defer runtime.UnlockOSThread()
- f, err := os.Open("/proc/1/ns/uts")
- if err != nil {
- return "", "", err
- }
- defer f.Close()
- self, err := os.Open("/proc/self/ns/uts")
- if err != nil {
- return "", "", err
- }
- defer self.Close()
- defer func() {
- unix.Setns(int(self.Fd()), unix.CLONE_NEWUTS)
- }()
- err = unix.Setns(int(f.Fd()), unix.CLONE_NEWUTS)
- if err != nil {
- return "", "", err
- }
- var utsname unix.Utsname
- if err := unix.Uname(&utsname); err != nil {
- return "", "", err
- }
- hostname := string(bytes.Split(utsname.Nodename[:], []byte{0})[0])
- kernelVersion := string(bytes.Split(utsname.Release[:], []byte{0})[0])
- return hostname, kernelVersion, nil
- }
- func machineID() string {
- for _, p := range []string{"sys/devices/virtual/dmi/id/product_uuid", "etc/machine-id", "var/lib/dbus/machine-id"} {
- payload, err := os.ReadFile(path.Join("/proc/1/root", p))
- if err != nil {
- log.Warningln("failed to read machine-id:", err)
- continue
- }
- id := strings.TrimSpace(strings.Replace(string(payload), "-", "", -1))
- log.Infoln("machine-id: ", id)
- return id
- }
- return ""
- }
- func whitelistNodeExternalNetworks() {
- netdevs, err := node.NetDevices()
- if err != nil {
- log.Warningln("failed to get network interfaces:", err)
- return
- }
- for _, iface := range netdevs {
- for _, p := range iface.IPPrefixes {
- if p.IP().IsLoopback() || common.IsIpPrivate(p.IP()) {
- continue
- }
- // if the node has an external network IP, whitelist that network
- common.ConnectionFilter.WhitelistPrefix(p)
- }
- }
- }
- type MetricItemData struct {
- Label map[string]string `json:"metric_tags"`
- Value any `json:"value"`
- }
- type MetricData struct {
- MetricKey string `json:"metric_key"`
- Metric []MetricItemData `json:"metric"`
- }
- func main() {
- runtime.GOMAXPROCS(1)
- err := logs.InitLog(*flags.LogLevel, logs.LogConfig{
- Path: utils.GetDefaultLogPath(),
- AppInfo: enums.DaemonProc,
- MaxSize: 50, // 日志文件最大尺寸,单位MB
- MaxBackups: 3, // 最多保留的旧日志文件数
- MaxAge: 3, // 日志文件保留的最长时间,单位天
- Console: true,
- })
- if err != nil {
- log.WithError(err).Errorf("log init error.")
- }
- if err := rlimit.RemoveMemlock(); err != nil {
- log.WithError(err).Warning("Failed Removing memlock.")
- } else {
- log.Info("Rlimit removed")
- }
- //log.LogToStderr(false)
- //log.SetOutput(&RateLimitedLogOutput{limiter: rate.NewLimiter(rate.Limit(*flags.LogPerSecond), *flags.LogBurst)})
- log.Infoln("agent version:", version)
- hostname, kv, err := uname()
- if err != nil {
- log.Fatalln("failed to get uname:", err)
- }
- log.Infoln("hostname:", hostname)
- log.Infoln("kernel version:", kv)
- // 构建节点信息
- nodeInfo, err := node.NewNodeInfo(hostname, kv)
- if err != nil || nodeInfo == nil {
- log.Fatalln(err)
- }
- log.Infof("node info %s", utils.ToString(nodeInfo))
- ver := common.KernelMajorMinor(kv)
- if ver == "" {
- log.Fatalln("invalid kernel version:", kv)
- }
- if semver.Compare("v"+ver, "v"+minSupportedKernelVersion) == -1 {
- log.Fatalf("the minimum Linux kernel version required is %s or later", minSupportedKernelVersion)
- }
- whitelistNodeExternalNetworks()
- machineId := nodeInfo.GetNodeInfo().SystemUUID
- tracing.Init(machineId, hostname, version)
- logs.Init(machineId, hostname, version)
- registry := prometheus.NewRegistry()
- registerer := prometheus.WrapRegistererWith(prometheus.Labels{"machine_id": machineId}, registry)
- registerer.MustRegister(info("node_agent_info", version))
- if err := registerer.Register(node.NewCollector(hostname, kv)); err != nil {
- log.Fatalln(err)
- }
- //processInfoCh := profiling.Init(machineId, hostname)
- cr, err := containers.NewRegistry(registerer, kv, nodeInfo, nil)
- if err != nil {
- log.Fatalln(err)
- }
- defer cr.Close()
- log.Infoln("START_TRACE")
- //profiling.Start()
- //defer profiling.Stop()
- // 创建一个/metrics路由处理函数
- metricsHandler := func(w http.ResponseWriter, r *http.Request) {
- // 从注册表中获取指标数据
- metrics, err := registry.Gather()
- if err != nil {
- // 错误处理
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- var Data []MetricData
- for _, metric := range metrics {
- if metric.GetName() != "container_net_tcp_successful_connects_total" &&
- metric.GetName() != "container_net_tcp_failed_connects_total" &&
- metric.GetName() != "container_net_tcp_retransmits_total" &&
- metric.GetName() != "container_net_tcp_listen_info" &&
- metric.GetName() != "container_http_requests_total" &&
- metric.GetName() != "container_http_requests_duration_seconds_total" &&
- metric.GetName() != "container_application_type" {
- continue
- }
- var item MetricData
- var itemOther MetricData
- item.MetricKey = metric.GetName()
- for _, m := range metric.GetMetric() {
- metricItem := MetricItemData{}
- label := make(map[string]string)
- for _, l := range m.GetLabel() {
- label[l.GetName()] = l.GetValue()
- }
- metricItem.Label = label
- switch metric.GetType() {
- case dto.MetricType_COUNTER:
- metricItem.Value = m.GetCounter().GetValue()
- item.Metric = append(item.Metric, metricItem)
- case dto.MetricType_GAUGE:
- metricItem.Value = m.GetGauge().GetValue()
- item.Metric = append(item.Metric, metricItem)
- case dto.MetricType_HISTOGRAM:
- item.MetricKey = metric.GetName() + "_sum"
- metricItem.Value = m.GetHistogram().GetSampleSum()
- item.Metric = append(item.Metric, metricItem)
- metricItemOther := MetricItemData{}
- metricItemOther.Label = label
- itemOther.MetricKey = metric.GetName() + "_count"
- metricItemOther.Value = m.GetHistogram().GetSampleCount()
- itemOther.Metric = append(itemOther.Metric, metricItemOther)
- default:
- continue
- }
- }
- Data = append(Data, item)
- if metric.GetType() == dto.MetricType_HISTOGRAM {
- Data = append(Data, itemOther)
- }
- }
- // 将指标数据转换为JSON格式
- jsonData, err := json.Marshal(Data)
- //jsonData, err := json.Marshal(metrics)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- w.Header().Set("Content-Type", "application/json")
- w.Write(jsonData)
- }
- if err := prom.StartAgent(machineId); err != nil {
- log.Fatalln(err)
- }
- http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorLog: logger{}, Registry: registerer}))
- http.HandleFunc("/metrics2", metricsHandler)
- log.Infoln("listening on:", *flags.ListenAddress)
- log.Errorln(http.ListenAndServe(*flags.ListenAddress, nil))
- }
- func info(name, version string) prometheus.Collector {
- g := prometheus.NewGauge(prometheus.GaugeOpts{
- Name: name,
- ConstLabels: prometheus.Labels{"version": version},
- })
- g.Set(1)
- return g
- }
- type logger struct{}
- func (l logger) Println(v ...interface{}) {
- log.Errorln(v...)
- }
- type RateLimitedLogOutput struct {
- limiter *rate.Limiter
- }
- func (o *RateLimitedLogOutput) Write(data []byte) (int, error) {
- if !o.limiter.Allow() {
- return len(data), nil
- }
- return os.Stderr.Write(data)
- }
|