serverWorker.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. package worker
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "strings"
  7. "sync"
  8. "github.com/coroot/coroot-node-agent/flags"
  9. "github.com/coroot/coroot-node-agent/utils"
  10. . "github.com/coroot/coroot-node-agent/utils/modelse"
  11. log "github.com/sirupsen/logrus"
  12. //
  13. //log "github.com/sirupsen/logrus"
  14. )
  15. const (
  16. registerModel = 0
  17. connectTestModel = iota
  18. syncAgentInfoModel
  19. heartModel
  20. receiveTaskModel
  21. reportTaskResultModel
  22. installReportModel
  23. urlModel = "%s/api/ext/gaia/daemon/%s"
  24. urlModel_ = "%s/api/ext/gaia/daemon/%s/%s"
  25. doopUriKeyword = "doop-agent-api"
  26. )
  27. type ServerWorker interface {
  28. //InstallReport(r ReportRequest) error
  29. RegisterHost(RegisterHostReq) (int, error)
  30. RegisterApp(RegisterAppReq) error
  31. WhiteList(WhiteListReq) (WhiteData, error)
  32. WhiteListV2(WhiteListReq) (WhiteDataV2, error)
  33. PullAllAppInfo(EbpfAppReq) (EbpfAppResp, error)
  34. GetCodeSetting(CodeSettingReq) (CodeSettingResp, error)
  35. HeartbeatBatch(EuspaceHeartBatchRequest) (EuspaceHeartBatchResponse, error)
  36. //SyncAgentInfo(RegistRequest) ([]AgentList, error)
  37. //Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
  38. //ReceiveTask([]interface{}) ([]Task, error)
  39. //ReportTaskResult([]TaskResult) ([]byte, error)
  40. //SetToken(token string)
  41. //GetInfo() (string, string)
  42. }
  43. func (w *ServerHTTPWorker) GetCodeSetting(request CodeSettingReq) (CodeSettingResp, error) {
  44. //log.Infof("[get GetCodeSetting] request:%v.", utils.ToString(request))
  45. response := CodeSettingResp{}
  46. result, err := w.requestServer("/api/v70/agent/getCodeSetting", request)
  47. //log.Infof("[get GetCodeSetting] resp data:%v.", string(result))
  48. if err != nil {
  49. return response, err
  50. }
  51. err = json.Unmarshal(result, &response)
  52. if err != nil {
  53. log.WithError(err).Errorf("[get GetCodeSetting] Failed GetCodeSetting request:%v.", utils.ToString(request))
  54. return response, err
  55. }
  56. return response, nil
  57. }
  58. func (w *ServerHTTPWorker) PullAllAppInfo(request EbpfAppReq) (EbpfAppResp, error) {
  59. //log.Infof("[get all appinfo] request:%v.", utils.ToString(request))
  60. response := EbpfAppResp{}
  61. result, err := w.requestServer("/api/v70/ebpfAppInfo", request)
  62. //log.Infof("[get all appinfo] resp data:%v.", string(result))
  63. if err != nil {
  64. return response, err
  65. }
  66. err = json.Unmarshal(result, &response)
  67. if err != nil {
  68. log.WithError(err).Errorf("[get all appinfo] Failed PullAllAppInfo request:%v.", utils.ToString(request))
  69. return response, err
  70. }
  71. return response, nil
  72. }
  73. func (w *ServerHTTPWorker) RegisterHost(request RegisterHostReq) (int, error) {
  74. //log.Infof("[server register host] request:%v.", utils.ToString(request))
  75. result, err := w.requestServer("/v2/app/registerHost", request)
  76. //log.Infof("[server register host] resp:%v.", string(result))
  77. if err != nil {
  78. log.WithError(err).Errorf("[server register] Failed RegisterHost request:%v.", utils.ToString(request))
  79. return 0, err
  80. }
  81. var response RegisterHostResponse
  82. err = json.Unmarshal(result, &response)
  83. if err != nil {
  84. log.WithError(err).Errorf("[server register] Failed to parse RegisterHost response: %s", string(result))
  85. return 0, err
  86. }
  87. log.Infof("[server register] RegisterHost success, account_id: %d", response.AccountID)
  88. return response.AccountID, nil
  89. }
  90. func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error {
  91. //log.Infof("[server register app] request:%v.", utils.ToString(request))
  92. //result, err := w.requestServer("/v2/app/create", request)
  93. _, err := w.requestServer("/v2/app/create", request)
  94. //log.Infof("[server register app] resp data:%v.", string(result))
  95. if err != nil {
  96. log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
  97. return err
  98. }
  99. if *flags.RegisterAppToDoop {
  100. if len(w.proxyClient.Endpoints) > 0 {
  101. endpointUrl := w.proxyClient.Endpoints[0].Url
  102. if !strings.Contains(endpointUrl, doopUriKeyword) {
  103. doopUriPrefix := "/" + doopUriKeyword
  104. if strings.HasSuffix(endpointUrl, "/") {
  105. doopUriPrefix = doopUriKeyword
  106. }
  107. // 再次调用注册接口,向doop注册app信息
  108. _, err := w.requestServer(doopUriPrefix+"/v2/app/create", request)
  109. if err != nil {
  110. log.WithError(err).Errorf("[server register] Failed RegisterApp to DOOP request:%v.", utils.ToString(request))
  111. return err
  112. }
  113. }
  114. }
  115. }
  116. return nil
  117. }
  118. func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) {
  119. //log.Infof("[server whitelist] request:%v.", request.String())
  120. response := WhiteData{}
  121. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  122. //log.Infof("[server whitelist] resp data:%v.", string(result))
  123. if err != nil {
  124. return response, err
  125. }
  126. err = json.Unmarshal(result, &response)
  127. if err != nil {
  128. log.WithError(err).Errorf("[server whitelist] Failed Pull WhiteList request:%v.", request.String())
  129. return response, err
  130. }
  131. return response, nil
  132. }
  133. func (w *ServerHTTPWorker) WhiteListV2(request WhiteListReq) (WhiteDataV2, error) {
  134. //log.Infof("[server whitelist] request:%v.", request.String())
  135. response := WhiteDataV2{}
  136. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  137. //log.Infof("[server whitelist] resp data:%v.", string(result))
  138. if err != nil {
  139. return response, err
  140. }
  141. err = json.Unmarshal(result, &response)
  142. if err != nil {
  143. log.WithError(err).Errorf("[server whitelist] Failed Pull WhiteListV2 request:%v.", request.String())
  144. return response, err
  145. }
  146. return response, nil
  147. }
  148. func (w *ServerHTTPWorker) HeartbeatBatch(request EuspaceHeartBatchRequest) (EuspaceHeartBatchResponse, error) {
  149. response := EuspaceHeartBatchResponse{}
  150. result, err := w.requestServer("/api/v2/euspace/heartbeat/batch", request)
  151. if err != nil {
  152. return response, err
  153. }
  154. err = json.Unmarshal(result, &response)
  155. if err != nil {
  156. log.WithError(err).Errorf("[server heartbeat batch] Failed HeartbeatBatch request:%v.", utils.ToString(request))
  157. return response, err
  158. }
  159. return response, nil
  160. }
  161. type ServerHTTPWorker struct {
  162. currentIndex int // 标识当前使用的是那个链接方式
  163. countTotal int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮
  164. mux *sync.Mutex
  165. accountID int
  166. hostID string
  167. registUrl string
  168. installReport string
  169. heartUrl string
  170. taskUrl string
  171. returnTaskUrl string
  172. installAgentUrl string
  173. token string
  174. connectTestUrl string
  175. doccServer string
  176. daemonId string
  177. connectList []string
  178. tokenList []string
  179. //nodeInfo *NodeInfo
  180. prefix string
  181. proxyClient *Client
  182. }
  183. func NewServerHTTPWorker() (*ServerHTTPWorker, error) {
  184. s := &ServerHTTPWorker{
  185. accountID: 1,
  186. //daemonId: daemonId,
  187. //currentIndex: -1,
  188. //connectList: strings.Split(connect, ","),
  189. //tokenList: strings.Split(token, ","),
  190. mux: new(sync.Mutex),
  191. }
  192. s.prefix = *flags.ServerPrefix
  193. s.proxyClient, _ = GetProxyClient()
  194. //s.nodeInfo,_ = node.GetNodeInfo()
  195. return s, nil
  196. }
  197. // func (w *serverHTTPWorker) updateUrl(index int) int {
  198. // w.mux.Lock()
  199. // defer w.mux.Unlock()
  200. // // if index != w.currentIndex {
  201. // // return w.currentIndex
  202. // // }
  203. // w.currentIndex = (w.currentIndex + 1) % len(w.connectList)
  204. // connect := ""
  205. // length := len(w.tokenList)
  206. // if length > 0 {
  207. // w.token = w.tokenList[0]
  208. // }
  209. //
  210. // // if !strings.HasSuffix(connect, "http") {
  211. // // connect = fmt.Sprintf("http://%s", connect)
  212. // // }
  213. //
  214. // w.registUrl = fmt.Sprintf(urlModel, connect, "regist")
  215. // w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat")
  216. // w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId)
  217. // w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId)
  218. // w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents")
  219. // w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest")
  220. // w.installReport = fmt.Sprintf(urlModel, connect, "installreport")
  221. // log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s, installReport: %s",
  222. // w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport)
  223. // return w.currentIndex
  224. // }
  225. //
  226. // func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) {
  227. // response := RegistorRespData{}
  228. // data, err := json.Marshal(r)
  229. // if err != nil {
  230. // log.Errorf("register marshal request error:%s.", err)
  231. // return response, err
  232. // }
  233. // log.Infof("register request %s", string(data))
  234. // result, err := w.requestServer(registerModel, data)
  235. // if err != nil {
  236. // return response, err
  237. // }
  238. //
  239. // err = json.Unmarshal(result, &response)
  240. // if err != nil {
  241. // log.Errorf("register unmarshal response error:%s.", err)
  242. // return response, err
  243. // }
  244. //
  245. // return response, nil
  246. // }
  247. //
  248. // func (w *serverHTTPWorker) InstallReport(r ReportRequest) error {
  249. // data, err := json.Marshal(r)
  250. // if err != nil {
  251. // log.Errorf("install report marshal request error:%s.", err)
  252. // return err
  253. // }
  254. // log.Infof("install report request %s", string(data))
  255. // if _, err := w.requestServer(installReportModel, data); err != nil {
  256. // return err
  257. // }
  258. // return nil
  259. // }
  260. //
  261. // func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error {
  262. // data, err := json.Marshal(r)
  263. // if err != nil {
  264. // log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err)
  265. // return err
  266. // }
  267. // log.Infof("ConnectTest request:%s.", string(data))
  268. // _, err = w.requestServer(connectTestModel, data)
  269. // return err
  270. // }
  271. //
  272. // func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) {
  273. // response := RegistorRespData{}
  274. // data, err := json.Marshal(r)
  275. // if err != nil {
  276. // log.Errorf("sync agent runner serialize error:%s.", err)
  277. // return response.Agents, err
  278. // }
  279. // log.Infof("sync agent runner request:%s.", string(data))
  280. // result, err := w.requestServer(syncAgentInfoModel, data)
  281. // if err != nil {
  282. // return response.Agents, err
  283. // }
  284. //
  285. // err = json.Unmarshal(result, &response)
  286. // if err != nil {
  287. // log.Errorf("sync agent runner parser error return:%s.", err)
  288. // return response.Agents, err
  289. // }
  290. //
  291. // return response.Agents, nil
  292. // }
  293. //
  294. // func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) {
  295. // log.Infof("heartbeat request:%v.", request)
  296. // response := HB_Config{}
  297. // // Mac本上: P_resource_occupancy[0].cpu -> +Inf -> json: unsupported value: +Inf
  298. // data, err := json.Marshal(request)
  299. // if err != nil {
  300. // log.Errorf("heartbeat marshal request error:%s.", err)
  301. // return response.HB_Config_Detail, err
  302. // }
  303. //
  304. // log.Infof("heartbeat request:%s.", string(data))
  305. // result, err := w.requestServer(heartModel, data)
  306. // if err != nil {
  307. // return response.HB_Config_Detail, err
  308. // }
  309. //
  310. // err = json.Unmarshal(result, &response)
  311. // if err != nil {
  312. // log.Errorf("heartbeat unmarshal response error:%s.", err)
  313. // return response.HB_Config_Detail, err
  314. // }
  315. // return response.HB_Config_Detail, nil
  316. // }
  317. //
  318. // func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) {
  319. // defer func() {
  320. // if err := recover(); err != nil {
  321. // log.Errorf("get task panic, error is <%v>", err)
  322. // }
  323. // }()
  324. // data, err := json.Marshal(request)
  325. // if err != nil {
  326. // log.Errorf("get task serialize get task request, error is <%s>", err)
  327. // return nil, err
  328. // }
  329. //
  330. // log.Infof("get task request:%s.", string(data))
  331. //
  332. // result := RespJson{}
  333. // // index := w.getIndex()
  334. // // for i := 0; i <= 0; i++ {
  335. // url := w.taskUrl
  336. // // fmt.Println("ReceiveTask:", url)
  337. // response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data,
  338. // map[string]string{"token": w.token, "Content-Type": "application/json"})
  339. // // response, err := HTTPRequest(http.MethodPost, url, data, w.token)
  340. // if err != nil {
  341. // log.WithError(err).Errorf("proxy server error")
  342. // return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err)
  343. // }
  344. // log.Infof("url is < %s >, tasks response <%s>", url, string(response))
  345. //
  346. // err = json.Unmarshal(response, &result)
  347. // if err != nil {
  348. // log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  349. // url, err, string(response))
  350. // return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  351. // url, err, string(response))
  352. //
  353. // }
  354. //
  355. // if result.Code != 100000 {
  356. // log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  357. // url, result.Code, result.Msg)
  358. // return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  359. // url, result.Code, result.Msg)
  360. // }
  361. //
  362. // if err != nil {
  363. // log.Errorf("failed with request task, error is <%v>", err)
  364. // return nil, err
  365. // }
  366. //
  367. // if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil {
  368. // base64Data := bytes.TrimSuffix(result.Data, []byte("\""))
  369. // base64Data = bytes.TrimPrefix(base64Data, []byte("\""))
  370. //
  371. // decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data))
  372. // plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2)
  373. // if err != nil {
  374. // log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err)
  375. // return nil, err
  376. // }
  377. //
  378. // log.Infof("task info is <%s>", string(plainText))
  379. // err = json.Unmarshal(plainText, &tasks)
  380. // if err != nil {
  381. // log.Errorf("failed with json unmarshal, task info is <%s>, error is <%v>", string(plainText), err)
  382. // return nil, err
  383. // }
  384. //
  385. // return tasks, nil
  386. // }
  387. //
  388. // err = json.Unmarshal(result.Data, &tasks)
  389. // if err != nil {
  390. // log.Errorf("failed with json unmarshal, error is <%v>", err)
  391. // return nil, err
  392. // }
  393. // return tasks, nil
  394. // }
  395. //
  396. // func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) {
  397. // data, err := json.Marshal(taskResult)
  398. // if err != nil {
  399. // log.Errorf("report task result serialize error:%s.", err)
  400. // return nil, err
  401. // }
  402. // log.Infof("report task result request:%s.", string(data))
  403. //
  404. // data, err = w.requestServer(reportTaskResultModel, data)
  405. // return data, err
  406. // }
  407. //
  408. // func (w *serverHTTPWorker) getIndex() int {
  409. // w.mux.Lock()
  410. // defer w.mux.Unlock()
  411. // return w.currentIndex
  412. // }
  413. func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) {
  414. uri = w.prefix + uri
  415. byteData, err := json.Marshal(data)
  416. if err != nil {
  417. log.WithError(err).Errorf("[requestServer] marshal request error.")
  418. return byteData, err
  419. }
  420. response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData,
  421. map[string]string{"Content-Type": "application/json"})
  422. if err != nil {
  423. log.WithError(err).Errorf("[requestServer] proxy server error")
  424. // index = w.updateUrl(index)
  425. return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err)
  426. }
  427. log.Infof("url %s response %s", uri, string(response))
  428. result := RespJson{}
  429. err = json.Unmarshal(response, &result)
  430. if err != nil {
  431. log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response))
  432. // index = w.updateUrl(index)
  433. return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err,
  434. string(response))
  435. }
  436. if result.Code != 1000 {
  437. // index = w.updateUrl(index)
  438. log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg)
  439. return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code,
  440. result.Msg)
  441. }
  442. return result.Data, nil
  443. }
  444. //
  445. //func (w *serverHTTPWorker) SetToken(token string) {
  446. // w.token = token
  447. //}
  448. //
  449. //func (w *serverHTTPWorker) GetInfo() (string, string) {
  450. // return w.doccServer, w.token
  451. //}