apm_heartbeat_batch.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package containers
  2. import (
  3. "fmt"
  4. "strconv"
  5. "strings"
  6. "time"
  7. "github.com/coroot/coroot-node-agent/flags"
  8. "github.com/coroot/coroot-node-agent/utils"
  9. . "github.com/coroot/coroot-node-agent/utils/modelse"
  10. klog "github.com/sirupsen/logrus"
  11. )
  12. const (
  13. HeartbeatBatchInterval = 60 * time.Second // 心跳间隔 60 秒
  14. MaxBatchSize = 1000 // 单次最多 1000 条
  15. )
  16. // GenerateProcessId 生成进程唯一标识
  17. // 格式: {serviceType}-{serviceName}-{hash}
  18. // hash 生成规则: hash(systemuuid + ip + port)
  19. func GenerateProcessId(serviceType, serviceName, systemUUID, ip string, port int) string {
  20. if systemUUID == "" {
  21. systemUUID = utils.GetSystemUUID()
  22. }
  23. // 生成 hash: hash(systemuuid + ip + port)
  24. hashInput := fmt.Sprintf("%s%s%d", systemUUID, ip, port)
  25. hash := utils.HashTo16CharString(hashInput)
  26. // 格式: {serviceType}-{serviceName}-{hash}
  27. return fmt.Sprintf("%s-%s-%s", serviceType, serviceName, hash)
  28. }
  29. // CollectProcessHeartbeatInfo 收集所有容器的进程心跳信息
  30. func (r *Registry) CollectProcessHeartbeatInfo() []ProcessHeartbeatInfo {
  31. var processes []ProcessHeartbeatInfo
  32. nodeInfo := r.nodeInfo.GetNodeInfo()
  33. if nodeInfo == nil {
  34. return processes
  35. }
  36. systemUUID := nodeInfo.SystemUUID
  37. if systemUUID == "" {
  38. systemUUID = utils.GetSystemUUID()
  39. }
  40. // 使用 RegistryApps 获取进程信息(已在 handleEvents 中更新)
  41. // 为了安全,我们创建一个快照
  42. appsSnapshot := make(map[uint32]AppStatusInfo)
  43. for pid, app := range r.RegistryApps {
  44. appsSnapshot[pid] = app
  45. }
  46. // 遍历所有注册的应用
  47. for pid, appInfo := range appsSnapshot {
  48. // 获取服务类型
  49. serviceType := appInfo.Language
  50. if serviceType == "" {
  51. serviceType = "UNKNOWN"
  52. }
  53. // 获取 IP 和端口
  54. ip := ""
  55. port := 0
  56. if appInfo.Sn != "" {
  57. // Sn 格式可能是 "ip:port" 或 "ip1:port1,ip2:port2"(多个地址用逗号分隔)
  58. // 取第一个地址
  59. firstAddr := appInfo.Sn
  60. if idx := strings.Index(appInfo.Sn, ","); idx > 0 {
  61. firstAddr = appInfo.Sn[:idx]
  62. }
  63. // 解析 ip:port
  64. parts := strings.Split(firstAddr, ":")
  65. if len(parts) >= 1 {
  66. ip = parts[0]
  67. }
  68. if len(parts) >= 2 {
  69. if p, err := strconv.Atoi(parts[1]); err == nil {
  70. port = p
  71. }
  72. }
  73. }
  74. if ip == "" {
  75. ip = nodeInfo.HostIp
  76. }
  77. if port == 0 {
  78. port = appInfo.Sport
  79. }
  80. // 如果 port 仍然为 0,使用默认值或跳过(根据业务需求)
  81. // 这里保留 port=0,因为有些服务可能没有监听端口
  82. // 生成 processId
  83. processId := GenerateProcessId(serviceType, appInfo.AppName, systemUUID, ip, port)
  84. // 判断进程状态:根据 AppInfo.Status 判断
  85. // status=1: 运行中,占用配额
  86. // status=2: 已停止,释放配额
  87. status := 1 // 运行中
  88. if appInfo.Status == APP_UNINSTALL || appInfo.Status == APP_FUSE || appInfo.Status == APP_LICENSE_FUSE {
  89. status = 2 // 已停止
  90. }
  91. // 注意:APP_UPROBE_ERROR 和 APP_STACK_ERROR 虽然表示错误,但进程仍在运行,所以 status=1
  92. processInfo := ProcessHeartbeatInfo{
  93. ProcessId: processId,
  94. ServiceName: appInfo.AppName,
  95. ServiceType: serviceType,
  96. Pid: int(pid),
  97. Ip: ip,
  98. Port: port,
  99. Timestamp: time.Now().UnixMilli(),
  100. Status: status,
  101. }
  102. processes = append(processes, processInfo)
  103. // 限制批量大小
  104. if len(processes) >= MaxBatchSize {
  105. break
  106. }
  107. }
  108. return processes
  109. }
  110. // TaskHeartbeatBatch 心跳批量上报任务
  111. func (r *Registry) TaskHeartbeatBatch() {
  112. ticker := time.NewTicker(HeartbeatBatchInterval)
  113. defer ticker.Stop()
  114. for {
  115. select {
  116. case <-ticker.C:
  117. if !*flags.EnableLicenseCheck {
  118. continue
  119. }
  120. r.DoHeartbeatBatch()
  121. }
  122. }
  123. }
  124. // DoHeartbeatBatch 执行心跳批量上报
  125. func (r *Registry) DoHeartbeatBatch() error {
  126. nodeInfo := r.nodeInfo.GetNodeInfo()
  127. if nodeInfo == nil {
  128. return fmt.Errorf("nodeInfo is nil")
  129. }
  130. if nodeInfo.AccountID == 0 {
  131. klog.Debugf("[heartbeat batch] AccountID is 0, skip heartbeat")
  132. return nil
  133. }
  134. // 收集进程信息
  135. processes := r.CollectProcessHeartbeatInfo()
  136. if len(processes) == 0 {
  137. klog.Debugf("[heartbeat batch] No processes to report")
  138. return nil
  139. }
  140. // 分批上报(每批最多 1000 条)
  141. for i := 0; i < len(processes); i += MaxBatchSize {
  142. end := i + MaxBatchSize
  143. if end > len(processes) {
  144. end = len(processes)
  145. }
  146. batch := processes[i:end]
  147. err := r.sendHeartbeatBatch(nodeInfo, batch)
  148. if err != nil {
  149. klog.WithError(err).Errorf("[heartbeat batch] Failed to send batch %d-%d", i, end-1)
  150. // 继续处理下一批,不中断
  151. }
  152. }
  153. return nil
  154. }
  155. // sendHeartbeatBatch 发送单批心跳数据
  156. func (r *Registry) sendHeartbeatBatch(nodeInfo *NodeInfoT, processes []ProcessHeartbeatInfo) error {
  157. // 获取 euspaceAgentId(使用 HostID)
  158. euspaceAgentId := nodeInfo.HostID
  159. if euspaceAgentId == 0 {
  160. euspaceAgentId = utils.GetHostID()
  161. }
  162. req := EuspaceHeartBatchRequest{
  163. AccountId: nodeInfo.AccountID,
  164. EuspaceAgentId: euspaceAgentId,
  165. EuspaceVersion: nodeInfo.AgentVersion,
  166. HostIp: nodeInfo.HostIp,
  167. Processes: processes,
  168. }
  169. resp, err := r.connServer.HeartbeatBatch(req)
  170. if err != nil {
  171. return fmt.Errorf("heartbeat batch API call failed: %w", err)
  172. }
  173. // 处理响应,检查配额错误并触发 LicenseFuse
  174. r.handleHeartbeatBatchResponse(resp, processes)
  175. return nil
  176. }
  177. // handleHeartbeatBatchResponse 处理心跳批量上报响应
  178. func (r *Registry) handleHeartbeatBatchResponse(resp EuspaceHeartBatchResponse, processes []ProcessHeartbeatInfo) {
  179. // 创建 processId 到 PID 的映射
  180. processIdToPid := make(map[string]uint32)
  181. for _, proc := range processes {
  182. processIdToPid[proc.ProcessId] = uint32(proc.Pid)
  183. }
  184. pidToContainer := make(map[uint32]*Container)
  185. for pid := range r.RegistryApps {
  186. // 尝试从 containersByPid 获取容器(不持有锁,因为只是读取)
  187. if c, ok := r.containersByPid[pid]; ok && c != nil {
  188. pidToContainer[pid] = c
  189. }
  190. }
  191. // 检查响应结果数量是否匹配
  192. if len(resp.Results) != len(processes) {
  193. klog.Warnf("[heartbeat batch] Response results count (%d) doesn't match processes count (%d)",
  194. len(resp.Results), len(processes))
  195. }
  196. // 处理每个结果(仅在开启 License 校验时)
  197. hasQuotaError := false
  198. if *flags.EnableLicenseCheck {
  199. for _, result := range resp.Results {
  200. if !result.Success {
  201. // 检查是否是配额相关错误
  202. if result.ErrorCode == "QUOTA_EXCEED_MAX" || result.ErrorCode == "QUOTA_TIME_EXCEED_LICENSE" {
  203. hasQuotaError = true
  204. // 找到对应的容器并触发 LicenseFuse
  205. if pid, ok := processIdToPid[result.ProcessId]; ok {
  206. if c, ok := pidToContainer[pid]; ok {
  207. c.LicenseFuse()
  208. klog.Warnf("[heartbeat batch] LicenseFuse triggered for processId=%s, pid=%d, errorCode=%s, errorMessage=%s",
  209. result.ProcessId, pid, result.ErrorCode, result.ErrorMessage)
  210. } else {
  211. klog.Warnf("[heartbeat batch] LicenseFuse triggered but container not found for processId=%s, pid=%d",
  212. result.ProcessId, pid)
  213. }
  214. } else {
  215. klog.Warnf("[heartbeat batch] LicenseFuse triggered but pid not found for processId=%s",
  216. result.ProcessId)
  217. }
  218. }
  219. } else {
  220. // 成功时,清除 LicenseFuse 状态
  221. if pid, ok := processIdToPid[result.ProcessId]; ok {
  222. if c, ok := pidToContainer[pid]; ok {
  223. c.ClearLicenseFuse()
  224. }
  225. }
  226. }
  227. }
  228. }
  229. if hasQuotaError {
  230. klog.Warnf("[heartbeat batch] Quota error detected, LicenseFuse triggered for affected containers")
  231. } else {
  232. klog.Debugf("[heartbeat batch] Successfully reported %d processes, failed %d",
  233. resp.SuccessCount, resp.FailedCount)
  234. }
  235. }