serverWorker.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. package worker
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "strings"
  7. "sync"
  8. "github.com/coroot/coroot-node-agent/flags"
  9. "github.com/coroot/coroot-node-agent/utils"
  10. . "github.com/coroot/coroot-node-agent/utils/modelse"
  11. log "github.com/sirupsen/logrus"
  12. //
  13. //log "github.com/sirupsen/logrus"
  14. )
  15. const (
  16. registerModel = 0
  17. connectTestModel = iota
  18. syncAgentInfoModel
  19. heartModel
  20. receiveTaskModel
  21. reportTaskResultModel
  22. installReportModel
  23. urlModel = "%s/api/ext/gaia/daemon/%s"
  24. urlModel_ = "%s/api/ext/gaia/daemon/%s/%s"
  25. doopUriKeyword = "doop-agent-api"
  26. )
  27. type ServerWorker interface {
  28. //InstallReport(r ReportRequest) error
  29. RegisterHost(RegisterHostReq) (int, error)
  30. RegisterApp(RegisterAppReq) error
  31. WhiteList(WhiteListReq) (WhiteData, error)
  32. WhiteListV2(WhiteListReq) (WhiteDataV2, error)
  33. PullAllAppInfo(EbpfAppReq) (EbpfAppResp, error)
  34. GetCodeSetting(CodeSettingReq) (CodeSettingResp, error)
  35. //SyncAgentInfo(RegistRequest) ([]AgentList, error)
  36. //Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
  37. //ReceiveTask([]interface{}) ([]Task, error)
  38. //ReportTaskResult([]TaskResult) ([]byte, error)
  39. //SetToken(token string)
  40. //GetInfo() (string, string)
  41. }
  42. func (w *ServerHTTPWorker) GetCodeSetting(request CodeSettingReq) (CodeSettingResp, error) {
  43. //log.Infof("[get GetCodeSetting] request:%v.", utils.ToString(request))
  44. response := CodeSettingResp{}
  45. result, err := w.requestServer("/api/v70/agent/getCodeSetting", request)
  46. //log.Infof("[get GetCodeSetting] resp data:%v.", string(result))
  47. if err != nil {
  48. return response, err
  49. }
  50. err = json.Unmarshal(result, &response)
  51. if err != nil {
  52. log.WithError(err).Errorf("[get GetCodeSetting] Failed GetCodeSetting request:%v.", utils.ToString(request))
  53. return response, err
  54. }
  55. return response, nil
  56. }
  57. func (w *ServerHTTPWorker) PullAllAppInfo(request EbpfAppReq) (EbpfAppResp, error) {
  58. //log.Infof("[get all appinfo] request:%v.", utils.ToString(request))
  59. response := EbpfAppResp{}
  60. result, err := w.requestServer("/api/v70/ebpfAppInfo", request)
  61. //log.Infof("[get all appinfo] resp data:%v.", string(result))
  62. if err != nil {
  63. return response, err
  64. }
  65. err = json.Unmarshal(result, &response)
  66. if err != nil {
  67. log.WithError(err).Errorf("[get all appinfo] Failed PullAllAppInfo request:%v.", utils.ToString(request))
  68. return response, err
  69. }
  70. return response, nil
  71. }
  72. func (w *ServerHTTPWorker) RegisterHost(request RegisterHostReq) (int, error) {
  73. //log.Infof("[server register host] request:%v.", utils.ToString(request))
  74. result, err := w.requestServer("/v2/app/registerHost", request)
  75. //log.Infof("[server register host] resp:%v.", string(result))
  76. if err != nil {
  77. log.WithError(err).Errorf("[server register] Failed RegisterHost request:%v.", utils.ToString(request))
  78. return 0, err
  79. }
  80. var response RegisterHostResponse
  81. err = json.Unmarshal(result, &response)
  82. if err != nil {
  83. log.WithError(err).Errorf("[server register] Failed to parse RegisterHost response: %s", string(result))
  84. return 0, err
  85. }
  86. log.Infof("[server register] RegisterHost success, account_id: %d", response.AccountID)
  87. return response.AccountID, nil
  88. }
  89. func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error {
  90. //log.Infof("[server register app] request:%v.", utils.ToString(request))
  91. //result, err := w.requestServer("/v2/app/create", request)
  92. _, err := w.requestServer("/v2/app/create", request)
  93. //log.Infof("[server register app] resp data:%v.", string(result))
  94. if err != nil {
  95. log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
  96. return err
  97. }
  98. if *flags.RegisterAppToDoop {
  99. if len(w.proxyClient.Endpoints) > 0 {
  100. endpointUrl := w.proxyClient.Endpoints[0].Url
  101. if !strings.Contains(endpointUrl, doopUriKeyword) {
  102. doopUriPrefix := "/" + doopUriKeyword
  103. if strings.HasSuffix(endpointUrl, "/") {
  104. doopUriPrefix = doopUriKeyword
  105. }
  106. // 再次调用注册接口,向doop注册app信息
  107. _, err := w.requestServer(doopUriPrefix+"/v2/app/create", request)
  108. if err != nil {
  109. log.WithError(err).Errorf("[server register] Failed RegisterApp to DOOP request:%v.", utils.ToString(request))
  110. return err
  111. }
  112. }
  113. }
  114. }
  115. return nil
  116. }
  117. func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) {
  118. //log.Infof("[server whitelist] request:%v.", request.String())
  119. response := WhiteData{}
  120. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  121. //log.Infof("[server whitelist] resp data:%v.", string(result))
  122. if err != nil {
  123. return response, err
  124. }
  125. err = json.Unmarshal(result, &response)
  126. if err != nil {
  127. log.WithError(err).Errorf("[server whitelist] Failed Pull WhiteList request:%v.", request.String())
  128. return response, err
  129. }
  130. return response, nil
  131. }
  132. func (w *ServerHTTPWorker) WhiteListV2(request WhiteListReq) (WhiteDataV2, error) {
  133. //log.Infof("[server whitelist] request:%v.", request.String())
  134. response := WhiteDataV2{}
  135. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  136. //log.Infof("[server whitelist] resp data:%v.", string(result))
  137. if err != nil {
  138. return response, err
  139. }
  140. err = json.Unmarshal(result, &response)
  141. if err != nil {
  142. log.WithError(err).Errorf("[server whitelist] Failed Pull WhiteListV2 request:%v.", request.String())
  143. return response, err
  144. }
  145. return response, nil
  146. }
  147. type ServerHTTPWorker struct {
  148. currentIndex int // 标识当前使用的是那个链接方式
  149. countTotal int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮
  150. mux *sync.Mutex
  151. accountID int
  152. hostID string
  153. registUrl string
  154. installReport string
  155. heartUrl string
  156. taskUrl string
  157. returnTaskUrl string
  158. installAgentUrl string
  159. token string
  160. connectTestUrl string
  161. doccServer string
  162. daemonId string
  163. connectList []string
  164. tokenList []string
  165. //nodeInfo *NodeInfo
  166. prefix string
  167. proxyClient *Client
  168. }
  169. func NewServerHTTPWorker() (*ServerHTTPWorker, error) {
  170. s := &ServerHTTPWorker{
  171. accountID: 1,
  172. //daemonId: daemonId,
  173. //currentIndex: -1,
  174. //connectList: strings.Split(connect, ","),
  175. //tokenList: strings.Split(token, ","),
  176. mux: new(sync.Mutex),
  177. }
  178. s.prefix = *flags.ServerPrefix
  179. s.proxyClient, _ = GetProxyClient()
  180. //s.nodeInfo,_ = node.GetNodeInfo()
  181. return s, nil
  182. }
  183. // func (w *serverHTTPWorker) updateUrl(index int) int {
  184. // w.mux.Lock()
  185. // defer w.mux.Unlock()
  186. // // if index != w.currentIndex {
  187. // // return w.currentIndex
  188. // // }
  189. // w.currentIndex = (w.currentIndex + 1) % len(w.connectList)
  190. // connect := ""
  191. // length := len(w.tokenList)
  192. // if length > 0 {
  193. // w.token = w.tokenList[0]
  194. // }
  195. //
  196. // // if !strings.HasSuffix(connect, "http") {
  197. // // connect = fmt.Sprintf("http://%s", connect)
  198. // // }
  199. //
  200. // w.registUrl = fmt.Sprintf(urlModel, connect, "regist")
  201. // w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat")
  202. // w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId)
  203. // w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId)
  204. // w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents")
  205. // w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest")
  206. // w.installReport = fmt.Sprintf(urlModel, connect, "installreport")
  207. // log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s, installReport: %s",
  208. // w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport)
  209. // return w.currentIndex
  210. // }
  211. //
  212. // func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) {
  213. // response := RegistorRespData{}
  214. // data, err := json.Marshal(r)
  215. // if err != nil {
  216. // log.Errorf("register marshal request error:%s.", err)
  217. // return response, err
  218. // }
  219. // log.Infof("register request %s", string(data))
  220. // result, err := w.requestServer(registerModel, data)
  221. // if err != nil {
  222. // return response, err
  223. // }
  224. //
  225. // err = json.Unmarshal(result, &response)
  226. // if err != nil {
  227. // log.Errorf("register unmarshal response error:%s.", err)
  228. // return response, err
  229. // }
  230. //
  231. // return response, nil
  232. // }
  233. //
  234. // func (w *serverHTTPWorker) InstallReport(r ReportRequest) error {
  235. // data, err := json.Marshal(r)
  236. // if err != nil {
  237. // log.Errorf("install report marshal request error:%s.", err)
  238. // return err
  239. // }
  240. // log.Infof("install report request %s", string(data))
  241. // if _, err := w.requestServer(installReportModel, data); err != nil {
  242. // return err
  243. // }
  244. // return nil
  245. // }
  246. //
  247. // func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error {
  248. // data, err := json.Marshal(r)
  249. // if err != nil {
  250. // log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err)
  251. // return err
  252. // }
  253. // log.Infof("ConnectTest request:%s.", string(data))
  254. // _, err = w.requestServer(connectTestModel, data)
  255. // return err
  256. // }
  257. //
  258. // func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) {
  259. // response := RegistorRespData{}
  260. // data, err := json.Marshal(r)
  261. // if err != nil {
  262. // log.Errorf("sync agent runner serialize error:%s.", err)
  263. // return response.Agents, err
  264. // }
  265. // log.Infof("sync agent runner request:%s.", string(data))
  266. // result, err := w.requestServer(syncAgentInfoModel, data)
  267. // if err != nil {
  268. // return response.Agents, err
  269. // }
  270. //
  271. // err = json.Unmarshal(result, &response)
  272. // if err != nil {
  273. // log.Errorf("sync agent runner parser error return:%s.", err)
  274. // return response.Agents, err
  275. // }
  276. //
  277. // return response.Agents, nil
  278. // }
  279. //
  280. // func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) {
  281. // log.Infof("heartbeat request:%v.", request)
  282. // response := HB_Config{}
  283. // // Mac本上: P_resource_occupancy[0].cpu -> +Inf -> json: unsupported value: +Inf
  284. // data, err := json.Marshal(request)
  285. // if err != nil {
  286. // log.Errorf("heartbeat marshal request error:%s.", err)
  287. // return response.HB_Config_Detail, err
  288. // }
  289. //
  290. // log.Infof("heartbeat request:%s.", string(data))
  291. // result, err := w.requestServer(heartModel, data)
  292. // if err != nil {
  293. // return response.HB_Config_Detail, err
  294. // }
  295. //
  296. // err = json.Unmarshal(result, &response)
  297. // if err != nil {
  298. // log.Errorf("heartbeat unmarshal response error:%s.", err)
  299. // return response.HB_Config_Detail, err
  300. // }
  301. // return response.HB_Config_Detail, nil
  302. // }
  303. //
  304. // func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) {
  305. // defer func() {
  306. // if err := recover(); err != nil {
  307. // log.Errorf("get task panic, error is <%v>", err)
  308. // }
  309. // }()
  310. // data, err := json.Marshal(request)
  311. // if err != nil {
  312. // log.Errorf("get task serialize get task request, error is <%s>", err)
  313. // return nil, err
  314. // }
  315. //
  316. // log.Infof("get task request:%s.", string(data))
  317. //
  318. // result := RespJson{}
  319. // // index := w.getIndex()
  320. // // for i := 0; i <= 0; i++ {
  321. // url := w.taskUrl
  322. // // fmt.Println("ReceiveTask:", url)
  323. // response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data,
  324. // map[string]string{"token": w.token, "Content-Type": "application/json"})
  325. // // response, err := HTTPRequest(http.MethodPost, url, data, w.token)
  326. // if err != nil {
  327. // log.WithError(err).Errorf("proxy server error")
  328. // return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err)
  329. // }
  330. // log.Infof("url is < %s >, tasks response <%s>", url, string(response))
  331. //
  332. // err = json.Unmarshal(response, &result)
  333. // if err != nil {
  334. // log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  335. // url, err, string(response))
  336. // return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  337. // url, err, string(response))
  338. //
  339. // }
  340. //
  341. // if result.Code != 100000 {
  342. // log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  343. // url, result.Code, result.Msg)
  344. // return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  345. // url, result.Code, result.Msg)
  346. // }
  347. //
  348. // if err != nil {
  349. // log.Errorf("failed with request task, error is <%v>", err)
  350. // return nil, err
  351. // }
  352. //
  353. // if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil {
  354. // base64Data := bytes.TrimSuffix(result.Data, []byte("\""))
  355. // base64Data = bytes.TrimPrefix(base64Data, []byte("\""))
  356. //
  357. // decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data))
  358. // plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2)
  359. // if err != nil {
  360. // log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err)
  361. // return nil, err
  362. // }
  363. //
  364. // log.Infof("task info is <%s>", string(plainText))
  365. // err = json.Unmarshal(plainText, &tasks)
  366. // if err != nil {
  367. // log.Errorf("failed with json unmarshal, task info is <%s>, error is <%v>", string(plainText), err)
  368. // return nil, err
  369. // }
  370. //
  371. // return tasks, nil
  372. // }
  373. //
  374. // err = json.Unmarshal(result.Data, &tasks)
  375. // if err != nil {
  376. // log.Errorf("failed with json unmarshal, error is <%v>", err)
  377. // return nil, err
  378. // }
  379. // return tasks, nil
  380. // }
  381. //
  382. // func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) {
  383. // data, err := json.Marshal(taskResult)
  384. // if err != nil {
  385. // log.Errorf("report task result serialize error:%s.", err)
  386. // return nil, err
  387. // }
  388. // log.Infof("report task result request:%s.", string(data))
  389. //
  390. // data, err = w.requestServer(reportTaskResultModel, data)
  391. // return data, err
  392. // }
  393. //
  394. // func (w *serverHTTPWorker) getIndex() int {
  395. // w.mux.Lock()
  396. // defer w.mux.Unlock()
  397. // return w.currentIndex
  398. // }
  399. func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) {
  400. uri = w.prefix + uri
  401. byteData, err := json.Marshal(data)
  402. if err != nil {
  403. log.WithError(err).Errorf("[requestServer] marshal request error.")
  404. return byteData, err
  405. }
  406. response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData,
  407. map[string]string{"Content-Type": "application/json"})
  408. if err != nil {
  409. log.WithError(err).Errorf("[requestServer] proxy server error")
  410. // index = w.updateUrl(index)
  411. return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err)
  412. }
  413. log.Infof("url %s response %s", uri, string(response))
  414. result := RespJson{}
  415. err = json.Unmarshal(response, &result)
  416. if err != nil {
  417. log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response))
  418. // index = w.updateUrl(index)
  419. return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err,
  420. string(response))
  421. }
  422. if result.Code != 1000 {
  423. // index = w.updateUrl(index)
  424. log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg)
  425. return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code,
  426. result.Msg)
  427. }
  428. return result.Data, nil
  429. }
  430. //
  431. //func (w *serverHTTPWorker) SetToken(token string) {
  432. // w.token = token
  433. //}
  434. //
  435. //func (w *serverHTTPWorker) GetInfo() (string, string) {
  436. // return w.doccServer, w.token
  437. //}