Просмотр исходного кода

Feature #TASK_QT-33508 ebpf批量上报心跳

Carl 4 месяцев назад
Родитель
Сommit
b462743400

+ 265 - 0
containers/apm_heartbeat_batch.go

@@ -0,0 +1,265 @@
+package containers
+
+import (
+	"fmt"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/coroot/coroot-node-agent/flags"
+	"github.com/coroot/coroot-node-agent/utils"
+	. "github.com/coroot/coroot-node-agent/utils/modelse"
+	klog "github.com/sirupsen/logrus"
+)
+
+const (
+	HeartbeatBatchInterval = 60 * time.Second // 心跳间隔 60 秒
+	MaxBatchSize           = 1000             // 单次最多 1000 条
+)
+
+// GenerateProcessId 生成进程唯一标识
+// 格式: {serviceType}-{serviceName}-{hash}
+// hash 生成规则: hash(systemuuid + ip + port)
+func GenerateProcessId(serviceType, serviceName, systemUUID, ip string, port int) string {
+	if systemUUID == "" {
+		systemUUID = utils.GetSystemUUID()
+	}
+	// 生成 hash: hash(systemuuid + ip + port)
+	hashInput := fmt.Sprintf("%s%s%d", systemUUID, ip, port)
+	hash := utils.HashTo16DigitString(hashInput)
+	// 格式: {serviceType}-{serviceName}-{hash}
+	return fmt.Sprintf("%s-%s-%s", serviceType, serviceName, hash)
+}
+
+// CollectProcessHeartbeatInfo 收集所有容器的进程心跳信息
+func (r *Registry) CollectProcessHeartbeatInfo() []ProcessHeartbeatInfo {
+	var processes []ProcessHeartbeatInfo
+	nodeInfo := r.nodeInfo.GetNodeInfo()
+	if nodeInfo == nil {
+		return processes
+	}
+
+	systemUUID := nodeInfo.SystemUUID
+	if systemUUID == "" {
+		systemUUID = utils.GetSystemUUID()
+	}
+
+	// 使用 RegistryApps 获取进程信息(已在 handleEvents 中更新)
+	// 为了安全,我们创建一个快照
+	appsSnapshot := make(map[uint32]AppStatusInfo)
+	for pid, app := range r.RegistryApps {
+		appsSnapshot[pid] = app
+	}
+
+	// 遍历所有注册的应用
+	for pid, appInfo := range appsSnapshot {
+		// 获取服务类型
+		serviceType := appInfo.Language
+		if serviceType == "" {
+			serviceType = "UNKNOWN"
+		}
+
+		// 获取 IP 和端口
+		ip := ""
+		port := 0
+		if appInfo.Sn != "" {
+			// Sn 格式可能是 "ip:port" 或 "ip1:port1,ip2:port2"(多个地址用逗号分隔)
+			// 取第一个地址
+			firstAddr := appInfo.Sn
+			if idx := strings.Index(appInfo.Sn, ","); idx > 0 {
+				firstAddr = appInfo.Sn[:idx]
+			}
+			// 解析 ip:port
+			parts := strings.Split(firstAddr, ":")
+			if len(parts) >= 1 {
+				ip = parts[0]
+			}
+			if len(parts) >= 2 {
+				if p, err := strconv.Atoi(parts[1]); err == nil {
+					port = p
+				}
+			}
+		}
+		if ip == "" {
+			ip = nodeInfo.HostIp
+		}
+		if port == 0 {
+			port = appInfo.Sport
+		}
+		// 如果 port 仍然为 0,使用默认值或跳过(根据业务需求)
+		// 这里保留 port=0,因为有些服务可能没有监听端口
+
+		// 生成 processId
+		processId := GenerateProcessId(serviceType, appInfo.AppName, systemUUID, ip, port)
+
+		// 判断进程状态:根据 AppInfo.Status 判断
+		// status=1: 运行中,占用配额
+		// status=2: 已停止,释放配额
+		status := 1 // 运行中
+		if appInfo.Status == APP_UNINSTALL || appInfo.Status == APP_FUSE || appInfo.Status == APP_LICENSE_FUSE {
+			status = 2 // 已停止
+		}
+		// 注意:APP_UPROBE_ERROR 和 APP_STACK_ERROR 虽然表示错误,但进程仍在运行,所以 status=1
+
+		processInfo := ProcessHeartbeatInfo{
+			ProcessId:   processId,
+			ServiceName: appInfo.AppName,
+			ServiceType: serviceType,
+			Pid:         int(pid),
+			Ip:          ip,
+			Port:        port,
+			Timestamp:   time.Now().UnixMilli(),
+			Status:      status,
+		}
+
+		processes = append(processes, processInfo)
+
+		// 限制批量大小
+		if len(processes) >= MaxBatchSize {
+			break
+		}
+	}
+
+	return processes
+}
+
+// TaskHeartbeatBatch 心跳批量上报任务
+func (r *Registry) TaskHeartbeatBatch() {
+	ticker := time.NewTicker(HeartbeatBatchInterval)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			r.DoHeartbeatBatch()
+		}
+	}
+}
+
+// DoHeartbeatBatch 执行心跳批量上报
+func (r *Registry) DoHeartbeatBatch() error {
+	nodeInfo := r.nodeInfo.GetNodeInfo()
+	if nodeInfo == nil {
+		return fmt.Errorf("nodeInfo is nil")
+	}
+
+	if nodeInfo.AccountID == 0 {
+		klog.Debugf("[heartbeat batch] AccountID is 0, skip heartbeat")
+		return nil
+	}
+
+	// 收集进程信息
+	processes := r.CollectProcessHeartbeatInfo()
+	if len(processes) == 0 {
+		klog.Debugf("[heartbeat batch] No processes to report")
+		return nil
+	}
+
+	// 分批上报(每批最多 1000 条)
+	for i := 0; i < len(processes); i += MaxBatchSize {
+		end := i + MaxBatchSize
+		if end > len(processes) {
+			end = len(processes)
+		}
+
+		batch := processes[i:end]
+		err := r.sendHeartbeatBatch(nodeInfo, batch)
+		if err != nil {
+			klog.WithError(err).Errorf("[heartbeat batch] Failed to send batch %d-%d", i, end-1)
+			// 继续处理下一批,不中断
+		}
+	}
+
+	return nil
+}
+
+// sendHeartbeatBatch 发送单批心跳数据
+func (r *Registry) sendHeartbeatBatch(nodeInfo *NodeInfoT, processes []ProcessHeartbeatInfo) error {
+	// 获取 euspaceAgentId(使用 HostID)
+	euspaceAgentId := nodeInfo.HostID
+	if euspaceAgentId == 0 {
+		euspaceAgentId = utils.GetHostID()
+	}
+
+	req := EuspaceHeartBatchRequest{
+		AccountId:      nodeInfo.AccountID,
+		EuspaceAgentId: euspaceAgentId,
+		EuspaceVersion: nodeInfo.AgentVersion,
+		HostIp:         nodeInfo.HostIp,
+		Processes:      processes,
+	}
+
+	resp, err := r.connServer.HeartbeatBatch(req)
+	if err != nil {
+		return fmt.Errorf("heartbeat batch API call failed: %w", err)
+	}
+
+	// 处理响应,检查配额错误并触发 LicenseFuse
+	r.handleHeartbeatBatchResponse(resp, processes)
+
+	return nil
+}
+
+// handleHeartbeatBatchResponse 处理心跳批量上报响应
+func (r *Registry) handleHeartbeatBatchResponse(resp EuspaceHeartBatchResponse, processes []ProcessHeartbeatInfo) {
+	// 创建 processId 到 PID 的映射
+	processIdToPid := make(map[string]uint32)
+	for _, proc := range processes {
+		processIdToPid[proc.ProcessId] = uint32(proc.Pid)
+	}
+
+	pidToContainer := make(map[uint32]*Container)
+	for pid := range r.RegistryApps {
+		// 尝试从 containersByPid 获取容器(不持有锁,因为只是读取)
+		if c, ok := r.containersByPid[pid]; ok && c != nil {
+			pidToContainer[pid] = c
+		}
+	}
+
+	// 检查响应结果数量是否匹配
+	if len(resp.Results) != len(processes) {
+		klog.Warnf("[heartbeat batch] Response results count (%d) doesn't match processes count (%d)",
+			len(resp.Results), len(processes))
+	}
+
+	// 处理每个结果(仅在开启 License 校验时)
+	hasQuotaError := false
+	if *flags.EnableLicenseCheck {
+		for _, result := range resp.Results {
+			if !result.Success {
+				// 检查是否是配额相关错误
+				if result.ErrorCode == "QUOTA_EXCEED_MAX" || result.ErrorCode == "QUOTA_TIME_EXCEED_LICENSE" {
+					hasQuotaError = true
+					// 找到对应的容器并触发 LicenseFuse
+					if pid, ok := processIdToPid[result.ProcessId]; ok {
+						if c, ok := pidToContainer[pid]; ok {
+							c.LicenseFuse()
+							klog.Warnf("[heartbeat batch] LicenseFuse triggered for processId=%s, pid=%d, errorCode=%s, errorMessage=%s",
+								result.ProcessId, pid, result.ErrorCode, result.ErrorMessage)
+						} else {
+							klog.Warnf("[heartbeat batch] LicenseFuse triggered but container not found for processId=%s, pid=%d",
+								result.ProcessId, pid)
+						}
+					} else {
+						klog.Warnf("[heartbeat batch] LicenseFuse triggered but pid not found for processId=%s",
+							result.ProcessId)
+					}
+				}
+			} else {
+				// 成功时,清除 LicenseFuse 状态
+				if pid, ok := processIdToPid[result.ProcessId]; ok {
+					if c, ok := pidToContainer[pid]; ok {
+						c.ClearLicenseFuse()
+					}
+				}
+			}
+		}
+	}
+
+	if hasQuotaError {
+		klog.Warnf("[heartbeat batch] Quota error detected, LicenseFuse triggered for affected containers")
+	} else {
+		klog.Debugf("[heartbeat batch] Successfully reported %d processes, failed %d",
+			resp.SuccessCount, resp.FailedCount)
+	}
+}

+ 6 - 5
containers/container.go

@@ -103,11 +103,11 @@ type ActiveConnection struct {
 	FirstWriteTime uint64
 	NewReadTime    uint64
 
-	http2Parser      *l7.Http2Parser
-	postgresParser   *l7.PostgresParser
-	mysqlParser      *l7.MysqlParser
-	dmParser         *l7.DmParser
-	cassandraParser  *l7.CassandraParser
+	http2Parser     *l7.Http2Parser
+	postgresParser  *l7.PostgresParser
+	mysqlParser     *l7.MysqlParser
+	dmParser        *l7.DmParser
+	cassandraParser *l7.CassandraParser
 }
 
 type ActiveAccept struct {
@@ -218,6 +218,7 @@ type Container struct {
 	UprobesMap   map[string]tracer.Uprobe
 	l7EventReady bool
 	l7Attach     bool
+	licenseFuse  bool
 	// 白名单详情
 	WhiteSettingInfo WhiteSettingInfo
 	// 应用详情

+ 29 - 2
containers/container_apm.go

@@ -832,6 +832,24 @@ func (c *Container) l7AttachSuccess() {
 	c.l7Attach = true
 }
 
+func (c *Container) IsLicenseFuse() bool {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+	return c.licenseFuse
+}
+
+func (c *Container) LicenseFuse() {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+	c.licenseFuse = true
+}
+
+func (c *Container) ClearLicenseFuse() {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+	c.licenseFuse = false
+}
+
 func (c *Container) ctrlStack(r *Registry, pid uint32) {
 	resp, err := c.GetCodeSetting(r)
 	if err != nil {
@@ -963,7 +981,7 @@ func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachT
 		}
 		p.uprobes = []link.Link{}
 		switch detachType {
-		case APP_UNINSTALL, APP_FUSE:
+		case APP_UNINSTALL, APP_FUSE, APP_LICENSE_FUSE:
 			codeType := c.GetCodeTypeFromCache(pid)
 			switch codeType {
 			case CodeTypeJava:
@@ -981,7 +999,7 @@ func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachT
 		if err := tracer.DelKProcInfo(pid); err != nil {
 			return fmt.Errorf("[DetachUprobes] failed to delete KProcInfo for pid %d, detach type is:%s", pid, detachType)
 		} else {
-			klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%d", pid, detachType)
+			klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%s", pid, detachType.String())
 			c.AppInfo.EBPFProcInfo = nil
 		}
 	} else {
@@ -1095,6 +1113,15 @@ func (c *Container) AgentCtrl(r *Registry, pid uint32) {
 	var err error
 	verifyAttachConditions, _ := c.verifyAttachConditions(r, pid)
 
+	// License fuse UNINSTALL(仅在开启 License 校验时)
+	if *flags.EnableLicenseCheck && c.IsLicenseFuse() {
+		if c.Isl7AttachSuccess() {
+			c.Detach(r.tracer, pid, APP_LICENSE_FUSE)
+		}
+		klog.WithField("pid", pid).Infoln("[AgentCtrl] License fusing.")
+		return
+	}
+
 	// fusing UNINSTALL
 	if r.isFusing && c.Isl7AttachSuccess() {
 		c.Detach(r.tracer, pid, APP_FUSE)

+ 6 - 1
containers/registry.go

@@ -166,6 +166,10 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, nodeInfo *Node
 			return nil, err
 		}
 		try.Go(r.TaskRegisterHost, CatchFn)
+		// 启动心跳批量上报任务(仅在开启 License 校验时)
+		if *flags.EnableLicenseCheck {
+			try.Go(r.TaskHeartbeatBatch, CatchFn)
+		}
 	}
 
 	if err = reg.Register(r); err != nil {
@@ -301,7 +305,8 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 				}
 			}
 			r.RegistryApps = runtimeApps
-			saveAppInfo(r.RegistryApps)
+			snapAppInfo(r.RegistryApps)
+
 			if r.isFusing {
 				fuseOnce = true
 			} else {

+ 1 - 1
containers/registry_apm.go

@@ -39,7 +39,7 @@ func setK8sTag(c *Container, tag map[string]string, pid uint32) {
 	c.K8sContainer.pid = sPid
 }
 
-func saveAppInfo(runtimeApps map[uint32]modelse.AppStatusInfo) {
+func snapAppInfo(runtimeApps map[uint32]modelse.AppStatusInfo) {
 	appStr, _ := json.Marshal(runtimeApps)
 	dumpPath := path.Join(utils.GetDefaultRuntimePath(), "memdump")
 	err := os.MkdirAll(dumpPath, 0755)

+ 7 - 4
flags/flags.go

@@ -40,10 +40,10 @@ var (
 	EnableElasticsearchDetection = kingpin.Flag("enable-es", "Enable Elasticsearch detection in HTTP requests").Default("false").Envar("ENABLE_ES").Bool()
 
 	ExternalNetworksWhitelist = kingpin.
-					Flag("track-public-network", "Allow track connections to the specified IP networks, all private networks are allowed by default (e.g., Y.Y.Y.Y/mask)").
-					Envar("TRACK_PUBLIC_NETWORK").
-					Default("0.0.0.0/0").
-					Strings()
+		Flag("track-public-network", "Allow track connections to the specified IP networks, all private networks are allowed by default (e.g., Y.Y.Y.Y/mask)").
+		Envar("TRACK_PUBLIC_NETWORK").
+		Default("0.0.0.0/0").
+		Strings()
 	EphemeralPortRange = kingpin.Flag("ephemeral-port-range", "Destination and Listen TCP ports from this range will be skipped").Default("").Envar("EPHEMERAL_PORT_RANGE").String()
 
 	Provider          = kingpin.Flag("provider", "`provider` label for `node_cloud_info` metric").Envar("PROVIDER").String()
@@ -83,6 +83,8 @@ var (
 	RegisterAppToDoop = kingpin.Flag("register-app-to-doop", "Register the app-info to the doop").Default("false").Envar("REGISTER_APP_TO_DOOP").Bool()
 	//是否向平台发送网络数据
 	SendNetData = kingpin.Flag("send-net-data", "Send the net data to platform").Default("false").Envar("SEND_NET_DATA").Bool()
+	//是否开启License校验逻辑
+	EnableLicenseCheck = kingpin.Flag("enable-license-check", "Enable license check and quota validation").Default("true").Envar("ENABLE_LICENSE_CHECK").Bool()
 )
 
 var AgentName = "euspace"
@@ -115,6 +117,7 @@ func init() {
 		}
 		*ConsoleLog = true
 		*LogLevel = "debug"
+		*EnableLicenseCheck = false
 		euspace.Print()
 	} else {
 		euspace := figure.NewColorFigure("Euspace", "slant", "blue", true)

+ 2 - 2
utils/id.go

@@ -24,7 +24,7 @@ func BuildInt64ID(str string) ID_STRING {
 	//code := fmt.Sprintf("%x", srcCode)
 	//id_string := md5ToDec(code)
 	//return ID_STRING(id_string)
-	return ID_STRING(hashTo16DigitString(str))
+	return ID_STRING(HashTo16DigitString(str))
 }
 
 func hexStringToBPFBytes(str string, out *HashByte) {
@@ -142,7 +142,7 @@ func md5ToDec(str string) string {
 	return strCode
 }
 
-func hashTo16DigitString(appName string) string {
+func HashTo16DigitString(appName string) string {
 	// 计算 MD5 哈希
 	hash := md5.Sum([]byte(appName))
 	// 将哈希值转换为一个大整数

+ 7 - 0
utils/modelse/app_info.go

@@ -20,6 +20,8 @@ const (
 	APP_UNINSTALL_ERROR
 	APP_FUSE
 	APP_FUSE_ERROR
+	APP_LICENSE_FUSE
+	APP_LICENSE_FUSE_ERROR
 )
 
 const (
@@ -32,6 +34,8 @@ func (s APP_TYPE) Error() APP_TYPE {
 		return APP_UNINSTALL_ERROR
 	case APP_FUSE:
 		return APP_FUSE_ERROR
+	case APP_LICENSE_FUSE:
+		return APP_LICENSE_FUSE_ERROR
 	default:
 		return APP_UPROBE_ERROR
 	}
@@ -47,6 +51,7 @@ func (s APP_TYPE) IsError() bool {
 	switch s {
 	case APP_UNINSTALL_ERROR:
 	case APP_FUSE_ERROR:
+	case APP_LICENSE_FUSE_ERROR:
 	case APP_UPROBE_ERROR:
 		return true
 	default:
@@ -65,6 +70,8 @@ func (s APP_TYPE) String() string {
 		return "UNINSTALL"
 	case APP_FUSE:
 		return "FUSE"
+	case APP_LICENSE_FUSE:
+		return "LICENSE FUSE"
 	default:
 		return "-"
 	}

+ 37 - 0
utils/modelse/models.go

@@ -3,6 +3,7 @@ package modelse
 import (
 	"encoding/json"
 	"fmt"
+
 	log "github.com/sirupsen/logrus"
 )
 
@@ -319,6 +320,42 @@ type EbpfAppReq struct {
 
 type EbpfAppResp map[string]string
 
+// EuspaceHeartBatchRequest 心跳批量上报请求
+type EuspaceHeartBatchRequest struct {
+	AccountId      int                    `json:"accountId"`
+	EuspaceAgentId int64                  `json:"euspaceAgentId"`
+	EuspaceVersion string                 `json:"euspaceVersion"`
+	HostIp         string                 `json:"hostIp"`
+	Processes      []ProcessHeartbeatInfo `json:"processes"`
+}
+
+// ProcessHeartbeatInfo 进程心跳信息
+type ProcessHeartbeatInfo struct {
+	ProcessId   string `json:"processId"`
+	ServiceName string `json:"serviceName"`
+	ServiceType string `json:"serviceType,omitempty"`
+	Pid         int    `json:"pid,omitempty"`
+	Ip          string `json:"ip,omitempty"`
+	Port        int    `json:"port,omitempty"`
+	Timestamp   int64  `json:"timestamp,omitempty"`
+	Status      int    `json:"status,omitempty"` // 1=运行中,2=已停止
+}
+
+// EuspaceHeartBatchResponse 心跳批量上报响应
+type EuspaceHeartBatchResponse struct {
+	SuccessCount int                      `json:"successCount"`
+	FailedCount  int                      `json:"failedCount"`
+	Results      []ProcessHeartbeatResult `json:"results"`
+}
+
+// ProcessHeartbeatResult 进程处理结果
+type ProcessHeartbeatResult struct {
+	ProcessId    string `json:"processId"`
+	Success      bool   `json:"success"`
+	ErrorCode    string `json:"errorCode,omitempty"`
+	ErrorMessage string `json:"errorMessage,omitempty"`
+}
+
 type TaskStatus struct {
 	TaskId     string `json:"taskid"`
 	TaskStatus string `json:"taskStatus"`

+ 16 - 0
utils/worker/serverWorker.go

@@ -43,6 +43,8 @@ type ServerWorker interface {
 	PullAllAppInfo(EbpfAppReq) (EbpfAppResp, error)
 
 	GetCodeSetting(CodeSettingReq) (CodeSettingResp, error)
+
+	HeartbeatBatch(EuspaceHeartBatchRequest) (EuspaceHeartBatchResponse, error)
 	//SyncAgentInfo(RegistRequest) ([]AgentList, error)
 
 	//Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
@@ -170,6 +172,20 @@ func (w *ServerHTTPWorker) WhiteListV2(request WhiteListReq) (WhiteDataV2, error
 	return response, nil
 }
 
+func (w *ServerHTTPWorker) HeartbeatBatch(request EuspaceHeartBatchRequest) (EuspaceHeartBatchResponse, error) {
+	response := EuspaceHeartBatchResponse{}
+	result, err := w.requestServer("/api/v2/euspace/heartbeat/batch", request)
+	if err != nil {
+		return response, err
+	}
+	err = json.Unmarshal(result, &response)
+	if err != nil {
+		log.WithError(err).Errorf("[server heartbeat batch] Failed HeartbeatBatch request:%v.", utils.ToString(request))
+		return response, err
+	}
+	return response, nil
+}
+
 type ServerHTTPWorker struct {
 	currentIndex    int // 标识当前使用的是那个链接方式
 	countTotal      int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮