Parcourir la source

Merge branch 'dev-newnet' into dev

Carl il y a 1 an
Parent
commit
4ea3a8709a
3 fichiers modifiés avec 159 ajouts et 35 suppressions
  1. 0 20
      containers/container.go
  2. 10 1
      ebpftracer/ebpf/tcp/state.c
  3. 149 14
      main.go

+ 0 - 20
containers/container.go

@@ -371,26 +371,6 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 		}
 	}
 
-	type AppInfo struct {
-		AppName        string        `json:"app_name"`
-		AppIdHash      INT_HASH_ID   `json:"app_id_hash"`
-		InstanceIdHash INT_HASH_ID   `json:"instance_id_hash"`
-		AgentId        int64         `json:"agent_id"`
-		Sn             string        `json:"sn"`
-		Sport          int           `json:"sport"`
-		ServiceName    string        `json:"service_name"`
-		CodeType       CodeType      `json:"code_type"`
-		EBPFProcInfo   *EbpfProcInfo `json:"ebpf_proc_info"`
-		RegisterAt     int64         `json:"register_at"`
-		UpdateAt       int64         `json:"update_at"`
-		Status         APP_TYPE      `json:"status"`
-	}
-
-	type INT_HASH_ID struct {
-		IntVal   int64
-		HashtVal HashByte
-	}
-
 	strInstanceID := strconv.FormatInt(c.AppInfo.InstanceIdHash.IntVal, 10)
 	strAppId := strconv.FormatInt(c.AppInfo.AppIdHash.IntVal, 10)
 	strAppName := c.AppInfo.AppName

+ 10 - 1
ebpftracer/ebpf/tcp/state.c

@@ -389,6 +389,15 @@ struct sys_enter_accept4_ctx {
 	int addrlen;
 };
 
+struct sys_enter_accept_ctx {
+	__u64 __unused_syscall_header;
+	__u32 __unused_syscall_nr;
+
+	long fd;
+	__u64 *sockaddr;
+	int addrlen;
+};
+
 struct sys_exit_accept_ctx {
 	__u64 __unused_syscall_header;
 	__u32 __unused_syscall_nr;
@@ -403,7 +412,7 @@ int tracepoint__sys_enter_accept4(struct sys_enter_accept4_ctx *ctx) {
 }  
 
 SEC("tracepoint/syscalls/sys_enter_accept")  
-int tracepoint__sys_enter_accept(struct trace_event_raw_sys_enter *ctx) {  
+int tracepoint__sys_enter_accept(struct sys_enter_accept_ctx *ctx) {  
     __u64 pid_tgid = bpf_get_current_pid_tgid();  
     cw_bpf_debug("[Go] [socket/tracepoint__sys_entry_accept----]getget: rdi_ptr::pid: %d\n", pid_tgid);  
     return 0;  

+ 149 - 14
main.go

@@ -38,7 +38,7 @@ import (
 
 var (
 	version = "unknown"
-	// sendNetDataInterval  = 1 * time.Minute
+	sendNetDataInterval  = 1 * time.Minute
 )
 
 const minSupportedKernelVersion = "4.18"
@@ -211,6 +211,153 @@ func main() {
 	//profiling.Start()
 	//defer profiling.Stop()
 	// 创建一个/metrics路由处理函数
+	sendNetDataFunc := func() {
+		// 从注册表中获取指标数据
+		metrics, err := registry.Gather()
+		if err != nil {
+			// 错误处理
+			return
+		}
+
+		// 创建正则表达式对象  
+		regex, err := regexp.Compile(`^process_.+_queries_total$`)  
+		if err != nil {  
+			return  
+		}
+
+		var postData PostData
+		postData.AccountID = strconv.Itoa(nodeInfo.AccountID)
+		postData.IP = nodeInfo.HostIp
+		postData.HostID = nodeInfo.HostID
+		postData.TimeStamp = uint64(time.Now().UnixNano())
+		postData.ServiceType = 30002
+		postData.HostName = nodeInfo.Hostname
+		for _, metric := range metrics {
+			if metric.GetName() != "process_net_tcp_successful_connects_total" &&
+				metric.GetName() != "process_net_tcp_failed_connects_total" &&
+				metric.GetName() != "process_net_tcp_retransmits_total" &&
+				metric.GetName() != "process_net_tcp_listen_info" &&
+				metric.GetName() != "process_http_requests_total" &&
+				metric.GetName() != "process_http_requests_duration_seconds_total" &&
+				metric.GetName() != "process_http_requests_duration_seconds_total_count" &&
+				// metric.GetName() != "process_mysql_queries_total" &&
+				// metric.GetName() != "process_mysql_queries_duration_seconds_total" &&
+				// metric.GetName() != "process_mysql_queries_duration_seconds_total_count" &&
+				// metric.GetName() != "process_redis_queries_total" &&
+				// metric.GetName() != "process_redis_queries_duration_seconds_total" &&
+				// metric.GetName() != "process_redis_queries_duration_seconds_total_count" &&
+				// metric.GetName() != "process_postgres_queries_total" &&
+				// metric.GetName() != "process_postgres_queries_duration_seconds_total" &&
+				// metric.GetName() != "process_postgres_queries_duration_seconds_total_count" &&
+				regex.MatchString(metric.GetName()) == false &&
+				metric.GetName() != "process_application_type" &&
+				metric.GetName() != "process_net_tcp_bytes_received_per" &&
+				metric.GetName() != "process_net_tcp_bytes_sent_per" &&
+				metric.GetName() != "process_net_tcp_bytes_received_total" &&
+				metric.GetName() != "process_net_tcp_bytes_sent_total" &&
+				metric.GetName() != "process_net_tcp_data_latency" &&
+				metric.GetName() != "process_net_tcp_data_duration" &&
+				metric.GetName() != "process_net_tcp_est_time"{
+				continue
+			}
+
+			var item MetricData
+			var itemOther MetricData
+			item.MetricKey = metric.GetName()
+
+			for _, m := range metric.GetMetric() {
+				metricItem := MetricItemData{}
+				label := make(map[string]string)
+				for _, l := range m.GetLabel() {
+					label[l.GetName()] = l.GetValue()
+				}
+				metricItem.Label = label
+				switch metric.GetType() {
+				case dto.MetricType_COUNTER:
+					metricItem.Value = m.GetCounter().GetValue()
+					item.Metric = append(item.Metric, metricItem)
+				case dto.MetricType_GAUGE:
+					metricItem.Value = m.GetGauge().GetValue()
+					item.Metric = append(item.Metric, metricItem)
+				case dto.MetricType_HISTOGRAM:
+					item.MetricKey = metric.GetName() + "_sum"
+					metricItem.Value = m.GetHistogram().GetSampleSum()
+					item.Metric = append(item.Metric, metricItem)
+					metricItemOther := MetricItemData{}
+					metricItemOther.Label = label
+
+					itemOther.MetricKey = metric.GetName() + "_count"
+					metricItemOther.Value = m.GetHistogram().GetSampleCount()
+					itemOther.Metric = append(itemOther.Metric, metricItemOther)
+				default:
+					continue
+				}
+			}
+			postData.Data = append(postData.Data, item)
+			if metric.GetType() == dto.MetricType_HISTOGRAM {
+				postData.Data = append(postData.Data, itemOther)
+			}
+		}
+		// 将指标数据转换为JSON格式
+		jsonData, err := json.Marshal(postData)
+		//jsonData, err := json.Marshal(metrics)
+		if err != nil {
+			return
+		}
+
+		// 创建请求  
+		urlRoute := "/api/v2/ebpf/receive"
+		// req, err := http.NewRequest("POST", "http://10.0.7.115:18080/api/v2/ebpf/receive", bytes.NewBuffer(jsonData))
+		req, err := http.NewRequest("POST", *flags.DataServer + urlRoute, bytes.NewBuffer(jsonData))  
+		if err != nil {  
+			fmt.Println("Error:", err)  
+			return  
+		}  
+
+		// 添加 Content-Type header  
+		req.Header.Add("Content-Type", "application/json")  
+
+		// 添加一个自定义 header  
+		req.Header.Add("DataCount", strconv.Itoa(len(postData.Data)))  
+		req.Header.Add("Account-Id", strconv.Itoa(nodeInfo.AccountID))  
+		req.Header.Add("ip", nodeInfo.HostIp)  
+
+		// 创建 HTTP 客户端  
+		client := &http.Client{}  
+
+		// 发送 HTTP POST 请求  
+		response, err := client.Do(req)  
+		if err != nil {  
+			fmt.Println("Error:", err)  
+			return  
+		}  
+		defer response.Body.Close()  
+
+		// 读取响应内容  
+		responseData, err := io.ReadAll(response.Body)  
+		if err != nil {  
+			fmt.Println("Error:", err)  
+			return  
+		}  
+
+		// 输出响应状态码和响应正文  
+		fmt.Println("Status Code:", response.StatusCode)  
+		fmt.Println("Response Body:", string(responseData)) 
+	}
+	sendNetDataDone := make(chan struct{})
+	go func() {
+		sendNetDataTicker := time.NewTicker(sendNetDataInterval)
+		defer sendNetDataTicker.Stop()
+		for {
+			select {
+			case <-sendNetDataDone:
+				return
+			case _ = <-sendNetDataTicker.C:
+				sendNetDataFunc()
+			}
+		}
+	}()
+
 	metricsHandler := func(w http.ResponseWriter, r *http.Request) {
 		// 从注册表中获取指标数据
 		metrics, err := registry.Gather()
@@ -351,23 +498,11 @@ func main() {
 		log.Fatalln(err)
 	}
 
-	// go func() {
-	// 	ticker := time.NewTicker(sendNetDataInterval)
-	// 	defer ticker.Stop()
-	// 	for {
-	// 		select {
-	// 		case <-c.done:
-	// 			return
-	// 		case t := <-ticker.C:
-	// 			c.gc(t)
-	// 		}
-	// 	}
-	// }()
-
 	http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorLog: logger{}, Registry: registerer}))
 	http.HandleFunc("/metrics2", metricsHandler)
 	log.Infoln("listening on:", *flags.ListenAddress)
 	log.Errorln(http.ListenAndServe(*flags.ListenAddress, nil))
+	close(sendNetDataDone)
 }
 
 func info(name, version string) prometheus.Collector {