| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 |
- package containers
- import (
- "fmt"
- "strconv"
- "strings"
- "time"
- "github.com/coroot/coroot-node-agent/flags"
- "github.com/coroot/coroot-node-agent/utils"
- . "github.com/coroot/coroot-node-agent/utils/modelse"
- klog "github.com/sirupsen/logrus"
- )
- const (
- HeartbeatBatchInterval = 60 * time.Second // 心跳间隔 60 秒
- MaxBatchSize = 1000 // 单次最多 1000 条
- )
- // GenerateProcessId 生成进程唯一标识
- // 格式: {serviceType}-{serviceName}-{hash}
- // hash 生成规则: hash(systemuuid + ip + port)
- func GenerateProcessId(serviceType, serviceName, systemUUID, ip string, port int) string {
- if systemUUID == "" {
- systemUUID = utils.GetSystemUUID()
- }
- // 生成 hash: hash(systemuuid + ip + port)
- hashInput := fmt.Sprintf("%s%s%d", systemUUID, ip, port)
- hash := utils.HashTo16CharString(hashInput)
- // 格式: {serviceType}-{serviceName}-{hash}
- return fmt.Sprintf("%s-%s-%s", serviceType, serviceName, hash)
- }
- // CollectProcessHeartbeatInfo 收集所有容器的进程心跳信息
- func (r *Registry) CollectProcessHeartbeatInfo() []ProcessHeartbeatInfo {
- var processes []ProcessHeartbeatInfo
- nodeInfo := r.nodeInfo.GetNodeInfo()
- if nodeInfo == nil {
- return processes
- }
- systemUUID := nodeInfo.SystemUUID
- if systemUUID == "" {
- systemUUID = utils.GetSystemUUID()
- }
- // 使用 RegistryApps 获取进程信息(已在 handleEvents 中更新)
- // 为了安全,我们创建一个快照
- appsSnapshot := make(map[uint32]AppStatusInfo)
- for pid, app := range r.RegistryApps {
- appsSnapshot[pid] = app
- }
- // 遍历所有注册的应用
- for pid, appInfo := range appsSnapshot {
- // 获取服务类型
- serviceType := appInfo.Language
- if serviceType == "" {
- serviceType = "UNKNOWN"
- }
- // 获取 IP 和端口
- ip := ""
- port := 0
- if appInfo.Sn != "" {
- // Sn 格式可能是 "ip:port" 或 "ip1:port1,ip2:port2"(多个地址用逗号分隔)
- // 取第一个地址
- firstAddr := appInfo.Sn
- if idx := strings.Index(appInfo.Sn, ","); idx > 0 {
- firstAddr = appInfo.Sn[:idx]
- }
- // 解析 ip:port
- parts := strings.Split(firstAddr, ":")
- if len(parts) >= 1 {
- ip = parts[0]
- }
- if len(parts) >= 2 {
- if p, err := strconv.Atoi(parts[1]); err == nil {
- port = p
- }
- }
- }
- if ip == "" {
- ip = nodeInfo.HostIp
- }
- if port == 0 {
- port = appInfo.Sport
- }
- // 如果 port 仍然为 0,使用默认值或跳过(根据业务需求)
- // 这里保留 port=0,因为有些服务可能没有监听端口
- // 生成 processId
- processId := GenerateProcessId(serviceType, appInfo.AppName, systemUUID, ip, port)
- // 判断进程状态:根据 AppInfo.Status 判断
- // status=1: 运行中,占用配额
- // status=2: 已停止,释放配额
- status := 1 // 运行中
- if appInfo.Status == APP_UNINSTALL || appInfo.Status == APP_FUSE || appInfo.Status == APP_LICENSE_FUSE {
- status = 2 // 已停止
- }
- // 注意:APP_UPROBE_ERROR 和 APP_STACK_ERROR 虽然表示错误,但进程仍在运行,所以 status=1
- processInfo := ProcessHeartbeatInfo{
- ProcessId: processId,
- ServiceName: appInfo.AppName,
- ServiceType: serviceType,
- Pid: int(pid),
- Ip: ip,
- Port: port,
- Timestamp: time.Now().UnixMilli(),
- Status: status,
- }
- processes = append(processes, processInfo)
- // 限制批量大小
- if len(processes) >= MaxBatchSize {
- break
- }
- }
- return processes
- }
- // TaskHeartbeatBatch 心跳批量上报任务
- func (r *Registry) TaskHeartbeatBatch() {
- ticker := time.NewTicker(HeartbeatBatchInterval)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- if !*flags.EnableLicenseCheck {
- continue
- }
- r.DoHeartbeatBatch()
- }
- }
- }
- // DoHeartbeatBatch 执行心跳批量上报
- func (r *Registry) DoHeartbeatBatch() error {
- nodeInfo := r.nodeInfo.GetNodeInfo()
- if nodeInfo == nil {
- return fmt.Errorf("nodeInfo is nil")
- }
- if nodeInfo.AccountID == 0 {
- klog.Debugf("[heartbeat batch] AccountID is 0, skip heartbeat")
- return nil
- }
- // 收集进程信息
- processes := r.CollectProcessHeartbeatInfo()
- if len(processes) == 0 {
- klog.Debugf("[heartbeat batch] No processes to report")
- return nil
- }
- // 分批上报(每批最多 1000 条)
- for i := 0; i < len(processes); i += MaxBatchSize {
- end := i + MaxBatchSize
- if end > len(processes) {
- end = len(processes)
- }
- batch := processes[i:end]
- err := r.sendHeartbeatBatch(nodeInfo, batch)
- if err != nil {
- klog.WithError(err).Errorf("[heartbeat batch] Failed to send batch %d-%d", i, end-1)
- // 继续处理下一批,不中断
- }
- }
- return nil
- }
- // sendHeartbeatBatch 发送单批心跳数据
- func (r *Registry) sendHeartbeatBatch(nodeInfo *NodeInfoT, processes []ProcessHeartbeatInfo) error {
- // 获取 euspaceAgentId(使用 HostID)
- euspaceAgentId := nodeInfo.HostID
- if euspaceAgentId == 0 {
- euspaceAgentId = utils.GetHostID()
- }
- req := EuspaceHeartBatchRequest{
- AccountId: nodeInfo.AccountID,
- EuspaceAgentId: euspaceAgentId,
- EuspaceVersion: nodeInfo.AgentVersion,
- HostIp: nodeInfo.HostIp,
- Processes: processes,
- }
- resp, err := r.connServer.HeartbeatBatch(req)
- if err != nil {
- return fmt.Errorf("heartbeat batch API call failed: %w", err)
- }
- // 处理响应,检查配额错误并触发 LicenseFuse
- r.handleHeartbeatBatchResponse(resp, processes)
- return nil
- }
- // handleHeartbeatBatchResponse 处理心跳批量上报响应
- func (r *Registry) handleHeartbeatBatchResponse(resp EuspaceHeartBatchResponse, processes []ProcessHeartbeatInfo) {
- // 创建 processId 到 PID 的映射
- processIdToPid := make(map[string]uint32)
- for _, proc := range processes {
- processIdToPid[proc.ProcessId] = uint32(proc.Pid)
- }
- pidToContainer := make(map[uint32]*Container)
- for pid := range r.RegistryApps {
- // 尝试从 containersByPid 获取容器(不持有锁,因为只是读取)
- if c, ok := r.containersByPid[pid]; ok && c != nil {
- pidToContainer[pid] = c
- }
- }
- // 检查响应结果数量是否匹配
- if len(resp.Results) != len(processes) {
- klog.Warnf("[heartbeat batch] Response results count (%d) doesn't match processes count (%d)",
- len(resp.Results), len(processes))
- }
- // 处理每个结果(仅在开启 License 校验时)
- hasQuotaError := false
- if *flags.EnableLicenseCheck {
- for _, result := range resp.Results {
- if !result.Success {
- // 检查是否是配额相关错误
- if result.ErrorCode == "QUOTA_EXCEED_MAX" || result.ErrorCode == "QUOTA_TIME_EXCEED_LICENSE" {
- hasQuotaError = true
- // 找到对应的容器并触发 LicenseFuse
- if pid, ok := processIdToPid[result.ProcessId]; ok {
- if c, ok := pidToContainer[pid]; ok {
- c.LicenseFuse()
- klog.Warnf("[heartbeat batch] LicenseFuse triggered for processId=%s, pid=%d, errorCode=%s, errorMessage=%s",
- result.ProcessId, pid, result.ErrorCode, result.ErrorMessage)
- } else {
- klog.Warnf("[heartbeat batch] LicenseFuse triggered but container not found for processId=%s, pid=%d",
- result.ProcessId, pid)
- }
- } else {
- klog.Warnf("[heartbeat batch] LicenseFuse triggered but pid not found for processId=%s",
- result.ProcessId)
- }
- }
- } else {
- // 成功时,清除 LicenseFuse 状态
- if pid, ok := processIdToPid[result.ProcessId]; ok {
- if c, ok := pidToContainer[pid]; ok {
- c.ClearLicenseFuse()
- }
- }
- }
- }
- }
- if hasQuotaError {
- klog.Warnf("[heartbeat batch] Quota error detected, LicenseFuse triggered for affected containers")
- } else {
- klog.Debugf("[heartbeat batch] Successfully reported %d processes, failed %d",
- resp.SuccessCount, resp.FailedCount)
- }
- }
|