serverWorker.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. package worker
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "strings"
  7. "sync"
  8. "github.com/coroot/coroot-node-agent/flags"
  9. "github.com/coroot/coroot-node-agent/utils"
  10. . "github.com/coroot/coroot-node-agent/utils/modelse"
  11. log "github.com/sirupsen/logrus"
  12. //
  13. //log "github.com/sirupsen/logrus"
  14. )
  15. const (
  16. registerModel = 0
  17. connectTestModel = iota
  18. syncAgentInfoModel
  19. heartModel
  20. receiveTaskModel
  21. reportTaskResultModel
  22. installReportModel
  23. urlModel = "%s/api/ext/gaia/daemon/%s"
  24. urlModel_ = "%s/api/ext/gaia/daemon/%s/%s"
  25. doopUriKeyword = "doop-agent-api"
  26. )
  27. type ServerWorker interface {
  28. //InstallReport(r ReportRequest) error
  29. RegisterHost(RegisterHostReq) (int, error)
  30. RegisterApp(RegisterAppReq) error
  31. WhiteList(WhiteListReq) (WhiteData, error)
  32. WhiteListV2(WhiteListReq) (WhiteDataV2, error)
  33. PullAllAppInfo(EbpfAppReq) (EbpfAppResp, error)
  34. GetCodeSetting(CodeSettingReq) (CodeSettingResp, error)
  35. HeartbeatBatch(EuspaceHeartBatchRequest) (EuspaceHeartBatchResponse, error)
  36. //SyncAgentInfo(RegistRequest) ([]AgentList, error)
  37. //Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
  38. //ReceiveTask([]interface{}) ([]Task, error)
  39. //ReportTaskResult([]TaskResult) ([]byte, error)
  40. //SetToken(token string)
  41. //GetInfo() (string, string)
  42. }
  43. func (w *ServerHTTPWorker) GetCodeSetting(request CodeSettingReq) (CodeSettingResp, error) {
  44. //log.Infof("[get GetCodeSetting] request:%v.", utils.ToString(request))
  45. response := CodeSettingResp{}
  46. result, err := w.requestServer("/api/v70/agent/getCodeSetting", request)
  47. //log.Infof("[get GetCodeSetting] resp data:%v.", string(result))
  48. if err != nil {
  49. return response, err
  50. }
  51. err = json.Unmarshal(result, &response)
  52. if err != nil {
  53. log.WithError(err).Errorf("[get GetCodeSetting] Failed GetCodeSetting request:%v.", utils.ToString(request))
  54. return response, err
  55. }
  56. return response, nil
  57. }
  58. func (w *ServerHTTPWorker) PullAllAppInfo(request EbpfAppReq) (EbpfAppResp, error) {
  59. //log.Infof("[get all appinfo] request:%v.", utils.ToString(request))
  60. response := EbpfAppResp{}
  61. result, err := w.requestServer("/api/v70/ebpfAppInfo", request)
  62. //log.Infof("[get all appinfo] resp data:%v.", string(result))
  63. if err != nil {
  64. return response, err
  65. }
  66. err = json.Unmarshal(result, &response)
  67. if err != nil {
  68. log.WithError(err).Errorf("[get all appinfo] Failed PullAllAppInfo request:%v.", utils.ToString(request))
  69. return response, err
  70. }
  71. return response, nil
  72. }
  73. func (w *ServerHTTPWorker) RegisterHost(request RegisterHostReq) (int, error) {
  74. //log.Infof("[server register host] request:%v.", utils.ToString(request))
  75. result, err := w.requestServer("/v2/app/registerHost", request)
  76. //log.Infof("[server register host] resp:%v.", string(result))
  77. if err != nil {
  78. log.WithError(err).Errorf("[server register] Failed RegisterHost request:%v.", utils.ToString(request))
  79. return 0, err
  80. }
  81. var response RegisterHostResponse
  82. err = json.Unmarshal(result, &response)
  83. if err != nil {
  84. log.WithError(err).Errorf("[server register] Failed to parse RegisterHost response: %s", string(result))
  85. return 0, err
  86. }
  87. log.Infof("[server register] RegisterHost success, account_id: %d", response.AccountID)
  88. return response.AccountID, nil
  89. }
  90. func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error {
  91. //log.Infof("[server register app] request:%v.", utils.ToString(request))
  92. //result, err := w.requestServer("/v2/app/create", request)
  93. _, err := w.requestServer("/v2/app/create", request)
  94. //log.Infof("[server register app] resp data:%v.", string(result))
  95. if err != nil {
  96. log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
  97. return err
  98. }
  99. if *flags.RegisterAppToDoop {
  100. if len(w.proxyClient.Endpoints) > 0 {
  101. endpointUrl := w.proxyClient.Endpoints[0].Url
  102. if !strings.Contains(endpointUrl, doopUriKeyword) {
  103. doopUriPrefix := "/" + doopUriKeyword
  104. if strings.HasSuffix(endpointUrl, "/") {
  105. doopUriPrefix = doopUriKeyword
  106. }
  107. // 再次调用注册接口,向doop注册app信息
  108. _, err := w.requestServer(doopUriPrefix+"/v2/app/create", request)
  109. if err != nil {
  110. log.WithError(err).Errorf("[server register] Failed RegisterApp to DOOP request:%v.", utils.ToString(request))
  111. return err
  112. }
  113. }
  114. }
  115. }
  116. return nil
  117. }
  118. // unused
  119. func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) {
  120. //log.Infof("[server whitelist] request:%v.", request.String())
  121. response := WhiteData{}
  122. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  123. //log.Infof("[server whitelist] resp data:%v.", string(result))
  124. if err != nil {
  125. return response, err
  126. }
  127. err = json.Unmarshal(result, &response)
  128. if err != nil {
  129. log.WithError(err).Errorf("[server whitelist] Failed Pull WhiteList request:%v.", request.String())
  130. return response, err
  131. }
  132. return response, nil
  133. }
  134. func (w *ServerHTTPWorker) WhiteListV2(request WhiteListReq) (WhiteDataV2, error) {
  135. //log.Infof("[server whitelist] request:%v.", request.String())
  136. response := WhiteDataV2{}
  137. if strings.Contains(*flags.ServerPrefix, doopUriKeyword) {
  138. log.Infof("[server whitelist] skip WhiteListV2, server_prefix contains %s", doopUriKeyword)
  139. return response, nil
  140. }
  141. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  142. //log.Infof("[server whitelist] resp data:%v.", string(result))
  143. if err != nil {
  144. return response, err
  145. }
  146. err = json.Unmarshal(result, &response)
  147. if err != nil {
  148. log.WithError(err).Errorf("[server whitelist] Failed Pull WhiteListV2 request:%v.", request.String())
  149. return response, err
  150. }
  151. return response, nil
  152. }
  153. func (w *ServerHTTPWorker) HeartbeatBatch(request EuspaceHeartBatchRequest) (EuspaceHeartBatchResponse, error) {
  154. response := EuspaceHeartBatchResponse{}
  155. if strings.Contains(*flags.ServerPrefix, doopUriKeyword) {
  156. log.Infof("[server heartbeat batch] skip HeartbeatBatch, server_prefix contains %s", doopUriKeyword)
  157. return response, nil
  158. }
  159. result, err := w.requestServer("/v2/euspace/heartbeat/batch", request)
  160. if err != nil {
  161. return response, err
  162. }
  163. err = json.Unmarshal(result, &response)
  164. if err != nil {
  165. log.WithError(err).Errorf("[server heartbeat batch] Failed HeartbeatBatch request:%v.", utils.ToString(request))
  166. return response, err
  167. }
  168. return response, nil
  169. }
  170. type ServerHTTPWorker struct {
  171. currentIndex int // 标识当前使用的是那个链接方式
  172. countTotal int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮
  173. mux *sync.Mutex
  174. accountID int
  175. hostID string
  176. registUrl string
  177. installReport string
  178. heartUrl string
  179. taskUrl string
  180. returnTaskUrl string
  181. installAgentUrl string
  182. token string
  183. connectTestUrl string
  184. doccServer string
  185. daemonId string
  186. connectList []string
  187. tokenList []string
  188. //nodeInfo *NodeInfo
  189. prefix string
  190. proxyClient *Client
  191. }
  192. func NewServerHTTPWorker() (*ServerHTTPWorker, error) {
  193. s := &ServerHTTPWorker{
  194. accountID: 1,
  195. //daemonId: daemonId,
  196. //currentIndex: -1,
  197. //connectList: strings.Split(connect, ","),
  198. //tokenList: strings.Split(token, ","),
  199. mux: new(sync.Mutex),
  200. }
  201. s.prefix = *flags.ServerPrefix
  202. s.proxyClient, _ = GetProxyClient()
  203. //s.nodeInfo,_ = node.GetNodeInfo()
  204. return s, nil
  205. }
  206. // func (w *serverHTTPWorker) updateUrl(index int) int {
  207. // w.mux.Lock()
  208. // defer w.mux.Unlock()
  209. // // if index != w.currentIndex {
  210. // // return w.currentIndex
  211. // // }
  212. // w.currentIndex = (w.currentIndex + 1) % len(w.connectList)
  213. // connect := ""
  214. // length := len(w.tokenList)
  215. // if length > 0 {
  216. // w.token = w.tokenList[0]
  217. // }
  218. //
  219. // // if !strings.HasSuffix(connect, "http") {
  220. // // connect = fmt.Sprintf("http://%s", connect)
  221. // // }
  222. //
  223. // w.registUrl = fmt.Sprintf(urlModel, connect, "regist")
  224. // w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat")
  225. // w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId)
  226. // w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId)
  227. // w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents")
  228. // w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest")
  229. // w.installReport = fmt.Sprintf(urlModel, connect, "installreport")
  230. // log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s, installReport: %s",
  231. // w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport)
  232. // return w.currentIndex
  233. // }
  234. //
  235. // func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) {
  236. // response := RegistorRespData{}
  237. // data, err := json.Marshal(r)
  238. // if err != nil {
  239. // log.Errorf("register marshal request error:%s.", err)
  240. // return response, err
  241. // }
  242. // log.Infof("register request %s", string(data))
  243. // result, err := w.requestServer(registerModel, data)
  244. // if err != nil {
  245. // return response, err
  246. // }
  247. //
  248. // err = json.Unmarshal(result, &response)
  249. // if err != nil {
  250. // log.Errorf("register unmarshal response error:%s.", err)
  251. // return response, err
  252. // }
  253. //
  254. // return response, nil
  255. // }
  256. //
  257. // func (w *serverHTTPWorker) InstallReport(r ReportRequest) error {
  258. // data, err := json.Marshal(r)
  259. // if err != nil {
  260. // log.Errorf("install report marshal request error:%s.", err)
  261. // return err
  262. // }
  263. // log.Infof("install report request %s", string(data))
  264. // if _, err := w.requestServer(installReportModel, data); err != nil {
  265. // return err
  266. // }
  267. // return nil
  268. // }
  269. //
  270. // func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error {
  271. // data, err := json.Marshal(r)
  272. // if err != nil {
  273. // log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err)
  274. // return err
  275. // }
  276. // log.Infof("ConnectTest request:%s.", string(data))
  277. // _, err = w.requestServer(connectTestModel, data)
  278. // return err
  279. // }
  280. //
  281. // func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) {
  282. // response := RegistorRespData{}
  283. // data, err := json.Marshal(r)
  284. // if err != nil {
  285. // log.Errorf("sync agent runner serialize error:%s.", err)
  286. // return response.Agents, err
  287. // }
  288. // log.Infof("sync agent runner request:%s.", string(data))
  289. // result, err := w.requestServer(syncAgentInfoModel, data)
  290. // if err != nil {
  291. // return response.Agents, err
  292. // }
  293. //
  294. // err = json.Unmarshal(result, &response)
  295. // if err != nil {
  296. // log.Errorf("sync agent runner parser error return:%s.", err)
  297. // return response.Agents, err
  298. // }
  299. //
  300. // return response.Agents, nil
  301. // }
  302. //
  303. // func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) {
  304. // log.Infof("heartbeat request:%v.", request)
  305. // response := HB_Config{}
  306. // // Mac本上: P_resource_occupancy[0].cpu -> +Inf -> json: unsupported value: +Inf
  307. // data, err := json.Marshal(request)
  308. // if err != nil {
  309. // log.Errorf("heartbeat marshal request error:%s.", err)
  310. // return response.HB_Config_Detail, err
  311. // }
  312. //
  313. // log.Infof("heartbeat request:%s.", string(data))
  314. // result, err := w.requestServer(heartModel, data)
  315. // if err != nil {
  316. // return response.HB_Config_Detail, err
  317. // }
  318. //
  319. // err = json.Unmarshal(result, &response)
  320. // if err != nil {
  321. // log.Errorf("heartbeat unmarshal response error:%s.", err)
  322. // return response.HB_Config_Detail, err
  323. // }
  324. // return response.HB_Config_Detail, nil
  325. // }
  326. //
  327. // func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) {
  328. // defer func() {
  329. // if err := recover(); err != nil {
  330. // log.Errorf("get task panic, error is <%v>", err)
  331. // }
  332. // }()
  333. // data, err := json.Marshal(request)
  334. // if err != nil {
  335. // log.Errorf("get task serialize get task request, error is <%s>", err)
  336. // return nil, err
  337. // }
  338. //
  339. // log.Infof("get task request:%s.", string(data))
  340. //
  341. // result := RespJson{}
  342. // // index := w.getIndex()
  343. // // for i := 0; i <= 0; i++ {
  344. // url := w.taskUrl
  345. // // fmt.Println("ReceiveTask:", url)
  346. // response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data,
  347. // map[string]string{"token": w.token, "Content-Type": "application/json"})
  348. // // response, err := HTTPRequest(http.MethodPost, url, data, w.token)
  349. // if err != nil {
  350. // log.WithError(err).Errorf("proxy server error")
  351. // return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err)
  352. // }
  353. // log.Infof("url is < %s >, tasks response <%s>", url, string(response))
  354. //
  355. // err = json.Unmarshal(response, &result)
  356. // if err != nil {
  357. // log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  358. // url, err, string(response))
  359. // return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  360. // url, err, string(response))
  361. //
  362. // }
  363. //
  364. // if result.Code != 100000 {
  365. // log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  366. // url, result.Code, result.Msg)
  367. // return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  368. // url, result.Code, result.Msg)
  369. // }
  370. //
  371. // if err != nil {
  372. // log.Errorf("failed with request task, error is <%v>", err)
  373. // return nil, err
  374. // }
  375. //
  376. // if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil {
  377. // base64Data := bytes.TrimSuffix(result.Data, []byte("\""))
  378. // base64Data = bytes.TrimPrefix(base64Data, []byte("\""))
  379. //
  380. // decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data))
  381. // plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2)
  382. // if err != nil {
  383. // log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err)
  384. // return nil, err
  385. // }
  386. //
  387. // log.Infof("task info is <%s>", string(plainText))
  388. // err = json.Unmarshal(plainText, &tasks)
  389. // if err != nil {
  390. // log.Errorf("failed with json unmarshal, task info is <%s>, error is <%v>", string(plainText), err)
  391. // return nil, err
  392. // }
  393. //
  394. // return tasks, nil
  395. // }
  396. //
  397. // err = json.Unmarshal(result.Data, &tasks)
  398. // if err != nil {
  399. // log.Errorf("failed with json unmarshal, error is <%v>", err)
  400. // return nil, err
  401. // }
  402. // return tasks, nil
  403. // }
  404. //
  405. // func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) {
  406. // data, err := json.Marshal(taskResult)
  407. // if err != nil {
  408. // log.Errorf("report task result serialize error:%s.", err)
  409. // return nil, err
  410. // }
  411. // log.Infof("report task result request:%s.", string(data))
  412. //
  413. // data, err = w.requestServer(reportTaskResultModel, data)
  414. // return data, err
  415. // }
  416. //
  417. // func (w *serverHTTPWorker) getIndex() int {
  418. // w.mux.Lock()
  419. // defer w.mux.Unlock()
  420. // return w.currentIndex
  421. // }
  422. func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) {
  423. uri = *flags.ServerPrefix + uri
  424. byteData, err := json.Marshal(data)
  425. if err != nil {
  426. log.WithError(err).Errorf("[requestServer] marshal request error.")
  427. return byteData, err
  428. }
  429. response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData,
  430. map[string]string{"Content-Type": "application/json"})
  431. if err != nil {
  432. log.WithError(err).Errorf("[requestServer] proxy server error")
  433. // index = w.updateUrl(index)
  434. return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err)
  435. }
  436. log.Infof("url %s response %s", uri, string(response))
  437. result := RespJson{}
  438. err = json.Unmarshal(response, &result)
  439. if err != nil {
  440. log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response))
  441. // index = w.updateUrl(index)
  442. return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err,
  443. string(response))
  444. }
  445. if result.Code != 1000 {
  446. // index = w.updateUrl(index)
  447. log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg)
  448. return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code,
  449. result.Msg)
  450. }
  451. return result.Data, nil
  452. }
  453. //
  454. //func (w *serverHTTPWorker) SetToken(token string) {
  455. // w.token = token
  456. //}
  457. //
  458. //func (w *serverHTTPWorker) GetInfo() (string, string) {
  459. // return w.doccServer, w.token
  460. //}