apm_heartbeat_batch.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. package containers
  2. import (
  3. "fmt"
  4. "strconv"
  5. "strings"
  6. "time"
  7. "github.com/coroot/coroot-node-agent/flags"
  8. "github.com/coroot/coroot-node-agent/utils"
  9. . "github.com/coroot/coroot-node-agent/utils/modelse"
  10. klog "github.com/sirupsen/logrus"
  11. )
  12. const (
  13. HeartbeatBatchInterval = 60 * time.Second // 心跳间隔 60 秒
  14. MaxBatchSize = 1000 // 单次最多 1000 条
  15. )
  16. // GenerateProcessId 生成进程唯一标识
  17. // 格式: {serviceType}-{serviceName}-{hash}
  18. // hash 生成规则: hash(systemuuid + ip + port)
  19. func GenerateProcessId(serviceType, serviceName, systemUUID, ip string, port int) string {
  20. if systemUUID == "" {
  21. systemUUID = utils.GetSystemUUID()
  22. }
  23. // 生成 hash: hash(systemuuid + ip + port)
  24. hashInput := fmt.Sprintf("%s%s%d", systemUUID, ip, port)
  25. hash := utils.HashTo16CharString(hashInput)
  26. // 格式: {serviceType}-{serviceName}-{hash}
  27. return fmt.Sprintf("%s-%s-%s", serviceType, serviceName, hash)
  28. }
  29. // CollectProcessHeartbeatInfo 收集所有容器的进程心跳信息
  30. func (r *Registry) CollectProcessHeartbeatInfo() []ProcessHeartbeatInfo {
  31. var processes []ProcessHeartbeatInfo
  32. nodeInfo := r.nodeInfo.GetNodeInfo()
  33. if nodeInfo == nil {
  34. return processes
  35. }
  36. systemUUID := nodeInfo.SystemUUID
  37. if systemUUID == "" {
  38. systemUUID = utils.GetSystemUUID()
  39. }
  40. // 使用 RegistryApps 获取进程信息(已在 handleEvents 中更新)
  41. // 为了安全,我们创建一个快照
  42. appsSnapshot := make(map[uint32]AppStatusInfo)
  43. for pid, app := range r.RegistryApps {
  44. appsSnapshot[pid] = app
  45. }
  46. // 遍历所有注册的应用
  47. for pid, appInfo := range appsSnapshot {
  48. // 获取服务类型
  49. serviceType := appInfo.Language
  50. if serviceType == "" {
  51. serviceType = "UNKNOWN"
  52. }
  53. // 获取 IP 和端口
  54. ip := ""
  55. port := 0
  56. if appInfo.Sn != "" {
  57. // Sn 格式可能是 "ip:port" 或 "ip1:port1,ip2:port2"(多个地址用逗号分隔)
  58. // 取第一个地址
  59. firstAddr := appInfo.Sn
  60. if idx := strings.Index(appInfo.Sn, ","); idx > 0 {
  61. firstAddr = appInfo.Sn[:idx]
  62. }
  63. // 解析 ip:port
  64. parts := strings.Split(firstAddr, ":")
  65. if len(parts) >= 1 {
  66. ip = parts[0]
  67. }
  68. if len(parts) >= 2 {
  69. if p, err := strconv.Atoi(parts[1]); err == nil {
  70. port = p
  71. }
  72. }
  73. }
  74. if ip == "" {
  75. ip = nodeInfo.HostIp
  76. }
  77. if port == 0 {
  78. port = appInfo.Sport
  79. }
  80. // 如果 port 仍然为 0,使用默认值或跳过(根据业务需求)
  81. // 这里保留 port=0,因为有些服务可能没有监听端口
  82. // 生成 processId
  83. processId := GenerateProcessId(serviceType, appInfo.AppName, systemUUID, ip, port)
  84. // 判断进程状态:根据 AppInfo.Status 判断
  85. // status=1: 运行中,占用配额
  86. // status=2: 已停止,释放配额
  87. status := 1 // 运行中
  88. if appInfo.Status == APP_UNINSTALL || appInfo.Status == APP_FUSE || appInfo.Status == APP_LICENSE_FUSE {
  89. status = 2 // 已停止
  90. }
  91. // 注意:APP_UPROBE_ERROR 和 APP_STACK_ERROR 虽然表示错误,但进程仍在运行,所以 status=1
  92. processInfo := ProcessHeartbeatInfo{
  93. ProcessId: processId,
  94. ServiceName: appInfo.AppName,
  95. ServiceType: serviceType,
  96. Pid: int(pid),
  97. Ip: ip,
  98. Port: port,
  99. Timestamp: time.Now().UnixMilli(),
  100. Status: status,
  101. }
  102. processes = append(processes, processInfo)
  103. // 限制批量大小
  104. if len(processes) >= MaxBatchSize {
  105. break
  106. }
  107. }
  108. return processes
  109. }
  110. // TaskHeartbeatBatch 心跳批量上报任务
  111. func (r *Registry) TaskHeartbeatBatch() {
  112. ticker := time.NewTicker(HeartbeatBatchInterval)
  113. defer ticker.Stop()
  114. for {
  115. select {
  116. case <-ticker.C:
  117. r.DoHeartbeatBatch()
  118. }
  119. }
  120. }
  121. // DoHeartbeatBatch 执行心跳批量上报
  122. func (r *Registry) DoHeartbeatBatch() error {
  123. nodeInfo := r.nodeInfo.GetNodeInfo()
  124. if nodeInfo == nil {
  125. return fmt.Errorf("nodeInfo is nil")
  126. }
  127. if nodeInfo.AccountID == 0 {
  128. klog.Debugf("[heartbeat batch] AccountID is 0, skip heartbeat")
  129. return nil
  130. }
  131. // 收集进程信息
  132. processes := r.CollectProcessHeartbeatInfo()
  133. if len(processes) == 0 {
  134. klog.Debugf("[heartbeat batch] No processes to report")
  135. return nil
  136. }
  137. // 分批上报(每批最多 1000 条)
  138. for i := 0; i < len(processes); i += MaxBatchSize {
  139. end := i + MaxBatchSize
  140. if end > len(processes) {
  141. end = len(processes)
  142. }
  143. batch := processes[i:end]
  144. err := r.sendHeartbeatBatch(nodeInfo, batch)
  145. if err != nil {
  146. klog.WithError(err).Errorf("[heartbeat batch] Failed to send batch %d-%d", i, end-1)
  147. // 继续处理下一批,不中断
  148. }
  149. }
  150. return nil
  151. }
  152. // sendHeartbeatBatch 发送单批心跳数据
  153. func (r *Registry) sendHeartbeatBatch(nodeInfo *NodeInfoT, processes []ProcessHeartbeatInfo) error {
  154. // 获取 euspaceAgentId(使用 HostID)
  155. euspaceAgentId := nodeInfo.HostID
  156. if euspaceAgentId == 0 {
  157. euspaceAgentId = utils.GetHostID()
  158. }
  159. req := EuspaceHeartBatchRequest{
  160. AccountId: nodeInfo.AccountID,
  161. EuspaceAgentId: euspaceAgentId,
  162. EuspaceVersion: nodeInfo.AgentVersion,
  163. HostIp: nodeInfo.HostIp,
  164. Processes: processes,
  165. }
  166. resp, err := r.connServer.HeartbeatBatch(req)
  167. if err != nil {
  168. return fmt.Errorf("heartbeat batch API call failed: %w", err)
  169. }
  170. // 处理响应,检查配额错误并触发 LicenseFuse
  171. r.handleHeartbeatBatchResponse(resp, processes)
  172. return nil
  173. }
  174. // handleHeartbeatBatchResponse 处理心跳批量上报响应
  175. func (r *Registry) handleHeartbeatBatchResponse(resp EuspaceHeartBatchResponse, processes []ProcessHeartbeatInfo) {
  176. // 创建 processId 到 PID 的映射
  177. processIdToPid := make(map[string]uint32)
  178. for _, proc := range processes {
  179. processIdToPid[proc.ProcessId] = uint32(proc.Pid)
  180. }
  181. pidToContainer := make(map[uint32]*Container)
  182. for pid := range r.RegistryApps {
  183. // 尝试从 containersByPid 获取容器(不持有锁,因为只是读取)
  184. if c, ok := r.containersByPid[pid]; ok && c != nil {
  185. pidToContainer[pid] = c
  186. }
  187. }
  188. // 检查响应结果数量是否匹配
  189. if len(resp.Results) != len(processes) {
  190. klog.Warnf("[heartbeat batch] Response results count (%d) doesn't match processes count (%d)",
  191. len(resp.Results), len(processes))
  192. }
  193. // 处理每个结果(仅在开启 License 校验时)
  194. hasQuotaError := false
  195. if *flags.EnableLicenseCheck {
  196. for _, result := range resp.Results {
  197. if !result.Success {
  198. // 检查是否是配额相关错误
  199. if result.ErrorCode == "QUOTA_EXCEED_MAX" || result.ErrorCode == "QUOTA_TIME_EXCEED_LICENSE" {
  200. hasQuotaError = true
  201. // 找到对应的容器并触发 LicenseFuse
  202. if pid, ok := processIdToPid[result.ProcessId]; ok {
  203. if c, ok := pidToContainer[pid]; ok {
  204. c.LicenseFuse()
  205. klog.Warnf("[heartbeat batch] LicenseFuse triggered for processId=%s, pid=%d, errorCode=%s, errorMessage=%s",
  206. result.ProcessId, pid, result.ErrorCode, result.ErrorMessage)
  207. } else {
  208. klog.Warnf("[heartbeat batch] LicenseFuse triggered but container not found for processId=%s, pid=%d",
  209. result.ProcessId, pid)
  210. }
  211. } else {
  212. klog.Warnf("[heartbeat batch] LicenseFuse triggered but pid not found for processId=%s",
  213. result.ProcessId)
  214. }
  215. }
  216. } else {
  217. // 成功时,清除 LicenseFuse 状态
  218. if pid, ok := processIdToPid[result.ProcessId]; ok {
  219. if c, ok := pidToContainer[pid]; ok {
  220. c.ClearLicenseFuse()
  221. }
  222. }
  223. }
  224. }
  225. }
  226. if hasQuotaError {
  227. klog.Warnf("[heartbeat batch] Quota error detected, LicenseFuse triggered for affected containers")
  228. } else {
  229. klog.Debugf("[heartbeat batch] Successfully reported %d processes, failed %d",
  230. resp.SuccessCount, resp.FailedCount)
  231. }
  232. }