Browse Source

Fixed #TASK_QT-9810 建连失败总数问题修改。

rock 1 year ago
parent
commit
010517a11b
3 changed files with 94 additions and 47 deletions
  1. 66 23
      containers/container.go
  2. 1 1
      containers/metrics.go
  3. 27 23
      main.go

+ 66 - 23
containers/container.go

@@ -90,8 +90,16 @@ type ActiveConnection struct {
 	Timestamp  uint64
 	Closed     time.Time
 
-	BytesSent     uint64
-	BytesReceived uint64
+	Retransmissions  uint64
+	BytesSent        uint64
+	PerBytesSent     uint64
+	BytesReceived    uint64
+	PerBytesReceived uint64
+
+	ConEstTime       time.Duration
+	FirstReadTime    uint64
+	FirstWriteTime   uint64
+	NewReadTime      uint64
 
 	http2Parser    *l7.Http2Parser
 	postgresParser *l7.PostgresParser
@@ -169,7 +177,8 @@ type Container struct {
 	listens map[netaddr.IPPort]map[uint32]*ListenDetails
 
 	connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
-	connectsFailed     map[netaddr.IPPort]int64      // dst -> count
+	// connectsFailed     map[netaddr.IPPort]int64      // dst -> count
+	connectsFailed     map[AddrPair]int64
 	connectLastAttempt map[netaddr.IPPort]time.Time  // dst -> time
 	connectionsActive  map[AddrPair]*ActiveConnection
 	connectionsByPidFd map[PidFd]*ActiveConnection
@@ -230,7 +239,8 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
 		listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
 
 		connectsSuccessful: map[AddrPair]*ConnectionStats{},
-		connectsFailed:     map[netaddr.IPPort]int64{},
+		// connectsFailed:     map[netaddr.IPPort]int64{},
+		connectsFailed:     map[AddrPair]int64{},
 		connectLastAttempt: map[netaddr.IPPort]time.Time{},
 		connectionsActive:  map[AddrPair]*ActiveConnection{},
 		connectionsByPidFd: map[PidFd]*ActiveConnection{},
@@ -381,21 +391,21 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 		targetAppId := apps[d.dst.String()]
 		ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
 		ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
-		if stats.Retransmissions > 0 {
-			ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
-		}
-		ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
-		ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
-		ch <- counter(metrics.NetBytesSentPer, float64(stats.PerBytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
-		ch <- counter(metrics.NetBytesReceivedPer, float64(stats.PerBytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
+		// if stats.Retransmissions > 0 {
+		// 	ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
+		// }
+		// ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
+		// ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
+		// ch <- counter(metrics.NetBytesSentPer, float64(stats.PerBytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
+		// ch <- counter(metrics.NetBytesReceivedPer, float64(stats.PerBytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
 
-		ch <- counter(metrics.NetDataLatency, float64(stats.FirstReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
-		ch <- counter(metrics.NetDataDuration, float64(stats.NewReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
-		ch <- counter(metrics.NetEstTime, float64(stats.ConEstTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
+		// ch <- counter(metrics.NetDataLatency, float64(stats.FirstReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
+		// ch <- counter(metrics.NetDataDuration, float64(stats.NewReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
+		// ch <- counter(metrics.NetEstTime, float64(stats.ConEstTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
 
-		klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
-		stats.PerBytesReceived = 0
-		stats.PerBytesSent = 0
+		// klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
+		// stats.PerBytesReceived = 0
+		// stats.PerBytesSent = 0
 	}
 
 	// for d, stats := range c.acceptsSuccessful {
@@ -405,13 +415,33 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 
 	// 	klog.Infof("c.acceptsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d", d.src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived)
 	// }
-	for dst, count := range c.connectsFailed {
-		targetAppId := apps[dst.String()]
-		ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, targetAppId, strAppName, dst.String())
+	// for dst, count := range c.connectsFailed {
+	// 	targetAppId := apps[dst.String()]
+	// 	ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, targetAppId, strAppName, dst.String())
+	// }
+	for addrPair, count := range c.connectsFailed {
+		targetAppId := apps[addrPair.dst.String()]
+		ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, targetAppId, strAppName, addrPair.src.String(), addrPair.dst.String())
 	}
 
 	connections := map[AddrPair]int{}
 	for addrPair, conn := range c.connectionsActive {
+		targetAppId := apps[conn.Dest.String()]
+		if conn.Retransmissions > 0 {
+			ch <- counter(metrics.NetRetransmits, float64(conn.Retransmissions), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
+		}
+		ch <- counter(metrics.NetBytesSent, float64(conn.BytesSent), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
+		ch <- counter(metrics.NetBytesReceived, float64(conn.BytesReceived), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
+		ch <- counter(metrics.NetBytesSentPer, float64(conn.PerBytesSent), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
+		ch <- counter(metrics.NetBytesReceivedPer, float64(conn.PerBytesReceived), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
+
+		ch <- counter(metrics.NetDataLatency, float64(conn.FirstReadTime-conn.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
+		ch <- counter(metrics.NetDataDuration, float64(conn.NewReadTime-conn.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
+		ch <- counter(metrics.NetEstTime, float64(conn.ConEstTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
+
+		// klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
+		conn.PerBytesReceived = 0
+		conn.PerBytesSent = 0
 		if !conn.Closed.IsZero() {
 			continue
 		}
@@ -695,7 +725,8 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	if failed {
-		c.connectsFailed[dst]++
+		key := AddrPair{src: dst, dst: *actualDst}
+		c.connectsFailed[key]++
 	} else {
 		key := AddrPair{src: dst, dst: *actualDst}
 		stats := c.connectsSuccessful[key]
@@ -714,6 +745,7 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
 			Pid:        pid,
 			Fd:         fd,
 			Timestamp:  timestamp,
+			ConEstTime: duration,
 		}
 		c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
 		c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
@@ -805,15 +837,20 @@ func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, rec
 	if sent > ac.BytesSent {
 		stats.BytesSent += sent - ac.BytesSent
 		stats.PerBytesSent = sent - ac.BytesSent
+		ac.PerBytesSent = sent - ac.BytesSent
 	}
 	if received > ac.BytesReceived {
 		stats.BytesReceived += received - ac.BytesReceived
 		stats.PerBytesReceived = received - ac.BytesReceived
+		ac.PerBytesReceived = received - ac.BytesReceived
 	}
 	if firstreadtime != 0 && firstwritetime != 0 && newreadtime != 0 {
 		stats.FirstReadTime = firstreadtime
 		stats.FirstWriteTime = firstwritetime
 		stats.NewReadTime = newreadtime
+		ac.FirstReadTime = firstreadtime
+		ac.FirstWriteTime = firstwritetime
+		ac.NewReadTime = newreadtime
 	}
 	ac.BytesSent = sent
 	ac.BytesReceived = received
@@ -960,6 +997,7 @@ func (c *Container) onRetransmission(srcDst AddrPair) bool {
 		c.connectsSuccessful[key] = stats
 	}
 	stats.Retransmissions++
+	conn.Retransmissions++
 	return true
 }
 
@@ -1114,7 +1152,7 @@ func (c *Container) ping() map[netaddr.IP]float64 {
 		ips[d.dst.IP()] = struct{}{}
 	}
 	for dst := range c.connectsFailed {
-		ips[dst.IP()] = struct{}{}
+		ips[dst.src.IP()] = struct{}{}
 	}
 	if len(ips) == 0 {
 		return nil
@@ -1286,7 +1324,12 @@ func (c *Container) gc(now time.Time) {
 		_, active := establishedDst[dst]
 		if !active && !at.IsZero() && now.Sub(at) > gcInterval {
 			delete(c.connectLastAttempt, dst)
-			delete(c.connectsFailed, dst)
+			// delete(c.connectsFailed, dst)
+			for dfailed := range c.connectsFailed {
+				if dfailed.src == dst {
+					delete(c.connectsFailed, dfailed)
+				}
+			}
 			for d := range c.connectsSuccessful {
 				if d.src == dst {
 					delete(c.connectsSuccessful, d)

+ 1 - 1
containers/metrics.go

@@ -89,7 +89,7 @@ var metrics = struct {
 	NetListenInfo:            metric("process_net_tcp_listen_info", "Listen address of the process", "listen_addr", "proxy"),
 	NetConnectionsSuccessful: metric("process_net_tcp_successful_connects_total", "Total number of successful TCP connects", "instance_id", "app_id", "target_app_id", "app_name", "src", "destination", "actual_destination"),
 	NetConnectionsTotalTime:  metric("process_net_tcp_connection_time_seconds_total", "Time spent on TCP connections", "instance_id", "app_id", "target_app_id", "app_name", "src", "destination", "actual_destination"),
-	NetConnectionsFailed:     metric("process_net_tcp_failed_connects_total", "Total number of failed TCP connects", "instance_id", "app_id","target_app_id", "app_name", "destination"),
+	NetConnectionsFailed:     metric("process_net_tcp_failed_connects_total", "Total number of failed TCP connects", "instance_id", "app_id","target_app_id", "app_name", "destination", "actual_destination"),
 	NetConnectionsActive:     metric("process_net_tcp_active_connections", "Number of active outbound connections used by the process", "destination", "actual_destination"),
 	NetRetransmits:           metric("process_net_tcp_retransmits_total", "Total number of retransmitted TCP segments", "instance_id", "app_id", "target_app_id", "app_name", "src", "destination", "actual_destination"),
 	NetLatency:               metric("process_net_latency_seconds", "Round-trip time between the process and a remote IP", "destination_ip"),

+ 27 - 23
main.go

@@ -18,7 +18,7 @@ import (
 	"os"
 	"path"
 	"path/filepath"
-	"regexp"
+	// "regexp"
 	"runtime"
 	"strconv"
 	"strings"
@@ -236,10 +236,10 @@ func main() {
 		}
 
 		// 创建正则表达式对象
-		regex, err := regexp.Compile(`^process_.+_queries_total$`)
-		if err != nil {
-			return
-		}
+		// regex, err := regexp.Compile(`^process_.+_queries_total$`)
+		// if err != nil {
+		// 	return
+		// }
 
 		var postData PostData
 		postData.AccountID = strconv.Itoa(nodeInfo.AccountID)
@@ -256,16 +256,18 @@ func main() {
 				metric.GetName() != "process_http_requests_total" &&
 				metric.GetName() != "process_http_requests_duration_seconds_total" &&
 				metric.GetName() != "process_http_requests_duration_seconds_total_count" &&
-				// metric.GetName() != "process_mysql_queries_total" &&
-				// metric.GetName() != "process_mysql_queries_duration_seconds_total" &&
+				metric.GetName() != "process_mysql_queries_total" &&
+				metric.GetName() != "process_mysql_queries_duration_seconds_total" &&
 				// metric.GetName() != "process_mysql_queries_duration_seconds_total_count" &&
-				// metric.GetName() != "process_redis_queries_total" &&
-				// metric.GetName() != "process_redis_queries_duration_seconds_total" &&
+				metric.GetName() != "process_redis_queries_total" &&
+				metric.GetName() != "process_redis_queries_duration_seconds_total" &&
 				// metric.GetName() != "process_redis_queries_duration_seconds_total_count" &&
-				// metric.GetName() != "process_postgres_queries_total" &&
-				// metric.GetName() != "process_postgres_queries_duration_seconds_total" &&
+				metric.GetName() != "process_postgres_queries_total" &&
+				metric.GetName() != "process_postgres_queries_duration_seconds_total" &&
 				// metric.GetName() != "process_postgres_queries_duration_seconds_total_count" &&
-				regex.MatchString(metric.GetName()) == false &&
+				// regex.MatchString(metric.GetName()) == false &&
+				metric.GetName() != "process_dm_queries_total" &&
+				metric.GetName() != "process_dm_queries_duration_seconds_total" &&
 				metric.GetName() != "process_application_type" &&
 				metric.GetName() != "process_net_tcp_bytes_received_per" &&
 				metric.GetName() != "process_net_tcp_bytes_sent_per" &&
@@ -386,10 +388,10 @@ func main() {
 		}
 
 		// 创建正则表达式对象
-		regex, err := regexp.Compile(`^process_.+_queries_total$`)
-		if err != nil {
-			return
-		}
+		// regex, err := regexp.Compile(`^process_.+_queries_total$`)
+		// if err != nil {
+		// 	return
+		// }
 
 		var postData PostData
 		postData.AccountID = strconv.Itoa(nodeInfo.AccountID)
@@ -406,16 +408,18 @@ func main() {
 				metric.GetName() != "process_http_requests_total" &&
 				metric.GetName() != "process_http_requests_duration_seconds_total" &&
 				metric.GetName() != "process_http_requests_duration_seconds_total_count" &&
-				// metric.GetName() != "process_mysql_queries_total" &&
-				// metric.GetName() != "process_mysql_queries_duration_seconds_total" &&
+				metric.GetName() != "process_mysql_queries_total" &&
+				metric.GetName() != "process_mysql_queries_duration_seconds_total" &&
 				// metric.GetName() != "process_mysql_queries_duration_seconds_total_count" &&
-				// metric.GetName() != "process_redis_queries_total" &&
-				// metric.GetName() != "process_redis_queries_duration_seconds_total" &&
+				metric.GetName() != "process_redis_queries_total" &&
+				metric.GetName() != "process_redis_queries_duration_seconds_total" &&
 				// metric.GetName() != "process_redis_queries_duration_seconds_total_count" &&
-				// metric.GetName() != "process_postgres_queries_total" &&
-				// metric.GetName() != "process_postgres_queries_duration_seconds_total" &&
+				metric.GetName() != "process_postgres_queries_total" &&
+				metric.GetName() != "process_postgres_queries_duration_seconds_total" &&
 				// metric.GetName() != "process_postgres_queries_duration_seconds_total_count" &&
-				regex.MatchString(metric.GetName()) == false &&
+				// regex.MatchString(metric.GetName()) == false &&
+				metric.GetName() != "process_dm_queries_total" &&
+				metric.GetName() != "process_dm_queries_duration_seconds_total" &&
 				metric.GetName() != "process_application_type" &&
 				metric.GetName() != "process_net_tcp_bytes_received_per" &&
 				metric.GetName() != "process_net_tcp_bytes_sent_per" &&