serverWorker.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. package worker
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "github.com/coroot/coroot-node-agent/flags"
  8. "github.com/coroot/coroot-node-agent/utils"
  9. . "github.com/coroot/coroot-node-agent/utils/modelse"
  10. log "github.com/sirupsen/logrus"
  11. //
  12. //log "github.com/sirupsen/logrus"
  13. )
  14. const (
  15. registerModel = 0
  16. connectTestModel = iota
  17. syncAgentInfoModel
  18. heartModel
  19. receiveTaskModel
  20. reportTaskResultModel
  21. installReportModel
  22. urlModel = "%s/api/ext/gaia/daemon/%s"
  23. urlModel_ = "%s/api/ext/gaia/daemon/%s/%s"
  24. )
  25. type ServerWorker interface {
  26. //InstallReport(r ReportRequest) error
  27. RegisterHost(RegisterHostReq) (int, error)
  28. RegisterApp(RegisterAppReq) error
  29. WhiteList(WhiteListReq) (WhiteData, error)
  30. WhiteListV2(WhiteListReq) (WhiteDataV2, error)
  31. PullAllAppInfo(EbpfAppReq) (EbpfAppResp, error)
  32. GetCodeSetting(CodeSettingReq) (CodeSettingResp, error)
  33. //SyncAgentInfo(RegistRequest) ([]AgentList, error)
  34. //Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
  35. //ReceiveTask([]interface{}) ([]Task, error)
  36. //ReportTaskResult([]TaskResult) ([]byte, error)
  37. //SetToken(token string)
  38. //GetInfo() (string, string)
  39. }
  40. func (w *ServerHTTPWorker) GetCodeSetting(request CodeSettingReq) (CodeSettingResp, error) {
  41. //log.Infof("[get GetCodeSetting] request:%v.", utils.ToString(request))
  42. response := CodeSettingResp{}
  43. result, err := w.requestServer("/api/v70/agent/getCodeSetting", request)
  44. //log.Infof("[get GetCodeSetting] resp data:%v.", string(result))
  45. if err != nil {
  46. return response, err
  47. }
  48. err = json.Unmarshal(result, &response)
  49. if err != nil {
  50. log.WithError(err).Errorf("[get GetCodeSetting] Failed GetCodeSetting request:%v.", utils.ToString(request))
  51. return response, err
  52. }
  53. return response, nil
  54. }
  55. func (w *ServerHTTPWorker) PullAllAppInfo(request EbpfAppReq) (EbpfAppResp, error) {
  56. //log.Infof("[get all appinfo] request:%v.", utils.ToString(request))
  57. response := EbpfAppResp{}
  58. result, err := w.requestServer("/api/v70/ebpfAppInfo", request)
  59. //log.Infof("[get all appinfo] resp data:%v.", string(result))
  60. if err != nil {
  61. return response, err
  62. }
  63. err = json.Unmarshal(result, &response)
  64. if err != nil {
  65. log.WithError(err).Errorf("[get all appinfo] Failed PullAllAppInfo request:%v.", utils.ToString(request))
  66. return response, err
  67. }
  68. return response, nil
  69. }
  70. func (w *ServerHTTPWorker) RegisterHost(request RegisterHostReq) (int, error) {
  71. //log.Infof("[server register host] request:%v.", utils.ToString(request))
  72. result, err := w.requestServer("/v2/app/registerHost", request)
  73. //log.Infof("[server register host] resp:%v.", string(result))
  74. if err != nil {
  75. log.WithError(err).Errorf("[server register] Failed RegisterHost request:%v.", utils.ToString(request))
  76. return 0, err
  77. }
  78. var response RegisterHostResponse
  79. err = json.Unmarshal(result, &response)
  80. if err != nil {
  81. log.WithError(err).Errorf("[server register] Failed to parse RegisterHost response: %s", string(result))
  82. return 0, err
  83. }
  84. log.Infof("[server register] RegisterHost success, account_id: %d", response.AccountID)
  85. return response.AccountID, nil
  86. }
  87. func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error {
  88. //log.Infof("[server register app] request:%v.", utils.ToString(request))
  89. //result, err := w.requestServer("/v2/app/create", request)
  90. _, err := w.requestServer("/v2/app/create", request)
  91. //log.Infof("[server register app] resp data:%v.", string(result))
  92. if err != nil {
  93. log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
  94. return err
  95. }
  96. return nil
  97. }
  98. func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) {
  99. //log.Infof("[server whitelist] request:%v.", request.String())
  100. response := WhiteData{}
  101. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  102. //log.Infof("[server whitelist] resp data:%v.", string(result))
  103. if err != nil {
  104. return response, err
  105. }
  106. err = json.Unmarshal(result, &response)
  107. if err != nil {
  108. log.WithError(err).Errorf("[server whitelist] Failed Pull WhiteList request:%v.", request.String())
  109. return response, err
  110. }
  111. return response, nil
  112. }
  113. func (w *ServerHTTPWorker) WhiteListV2(request WhiteListReq) (WhiteDataV2, error) {
  114. //log.Infof("[server whitelist] request:%v.", request.String())
  115. response := WhiteDataV2{}
  116. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  117. //log.Infof("[server whitelist] resp data:%v.", string(result))
  118. if err != nil {
  119. return response, err
  120. }
  121. err = json.Unmarshal(result, &response)
  122. if err != nil {
  123. log.WithError(err).Errorf("[server whitelist] Failed Pull WhiteListV2 request:%v.", request.String())
  124. return response, err
  125. }
  126. return response, nil
  127. }
  128. type ServerHTTPWorker struct {
  129. currentIndex int // 标识当前使用的是那个链接方式
  130. countTotal int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮
  131. mux *sync.Mutex
  132. accountID int
  133. hostID string
  134. registUrl string
  135. installReport string
  136. heartUrl string
  137. taskUrl string
  138. returnTaskUrl string
  139. installAgentUrl string
  140. token string
  141. connectTestUrl string
  142. doccServer string
  143. daemonId string
  144. connectList []string
  145. tokenList []string
  146. //nodeInfo *NodeInfo
  147. prefix string
  148. proxyClient *Client
  149. }
  150. func NewServerHTTPWorker() (*ServerHTTPWorker, error) {
  151. s := &ServerHTTPWorker{
  152. accountID: 1,
  153. //daemonId: daemonId,
  154. //currentIndex: -1,
  155. //connectList: strings.Split(connect, ","),
  156. //tokenList: strings.Split(token, ","),
  157. mux: new(sync.Mutex),
  158. }
  159. s.prefix = *flags.ServerPrefix
  160. s.proxyClient, _ = GetProxyClient()
  161. //s.nodeInfo,_ = node.GetNodeInfo()
  162. return s, nil
  163. }
  164. // func (w *serverHTTPWorker) updateUrl(index int) int {
  165. // w.mux.Lock()
  166. // defer w.mux.Unlock()
  167. // // if index != w.currentIndex {
  168. // // return w.currentIndex
  169. // // }
  170. // w.currentIndex = (w.currentIndex + 1) % len(w.connectList)
  171. // connect := ""
  172. // length := len(w.tokenList)
  173. // if length > 0 {
  174. // w.token = w.tokenList[0]
  175. // }
  176. //
  177. // // if !strings.HasSuffix(connect, "http") {
  178. // // connect = fmt.Sprintf("http://%s", connect)
  179. // // }
  180. //
  181. // w.registUrl = fmt.Sprintf(urlModel, connect, "regist")
  182. // w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat")
  183. // w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId)
  184. // w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId)
  185. // w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents")
  186. // w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest")
  187. // w.installReport = fmt.Sprintf(urlModel, connect, "installreport")
  188. // log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s, installReport: %s",
  189. // w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport)
  190. // return w.currentIndex
  191. // }
  192. //
  193. // func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) {
  194. // response := RegistorRespData{}
  195. // data, err := json.Marshal(r)
  196. // if err != nil {
  197. // log.Errorf("register marshal request error:%s.", err)
  198. // return response, err
  199. // }
  200. // log.Infof("register request %s", string(data))
  201. // result, err := w.requestServer(registerModel, data)
  202. // if err != nil {
  203. // return response, err
  204. // }
  205. //
  206. // err = json.Unmarshal(result, &response)
  207. // if err != nil {
  208. // log.Errorf("register unmarshal response error:%s.", err)
  209. // return response, err
  210. // }
  211. //
  212. // return response, nil
  213. // }
  214. //
  215. // func (w *serverHTTPWorker) InstallReport(r ReportRequest) error {
  216. // data, err := json.Marshal(r)
  217. // if err != nil {
  218. // log.Errorf("install report marshal request error:%s.", err)
  219. // return err
  220. // }
  221. // log.Infof("install report request %s", string(data))
  222. // if _, err := w.requestServer(installReportModel, data); err != nil {
  223. // return err
  224. // }
  225. // return nil
  226. // }
  227. //
  228. // func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error {
  229. // data, err := json.Marshal(r)
  230. // if err != nil {
  231. // log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err)
  232. // return err
  233. // }
  234. // log.Infof("ConnectTest request:%s.", string(data))
  235. // _, err = w.requestServer(connectTestModel, data)
  236. // return err
  237. // }
  238. //
  239. // func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) {
  240. // response := RegistorRespData{}
  241. // data, err := json.Marshal(r)
  242. // if err != nil {
  243. // log.Errorf("sync agent runner serialize error:%s.", err)
  244. // return response.Agents, err
  245. // }
  246. // log.Infof("sync agent runner request:%s.", string(data))
  247. // result, err := w.requestServer(syncAgentInfoModel, data)
  248. // if err != nil {
  249. // return response.Agents, err
  250. // }
  251. //
  252. // err = json.Unmarshal(result, &response)
  253. // if err != nil {
  254. // log.Errorf("sync agent runner parser error return:%s.", err)
  255. // return response.Agents, err
  256. // }
  257. //
  258. // return response.Agents, nil
  259. // }
  260. //
  261. // func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) {
  262. // log.Infof("heartbeat request:%v.", request)
  263. // response := HB_Config{}
  264. // // Mac本上: P_resource_occupancy[0].cpu -> +Inf -> json: unsupported value: +Inf
  265. // data, err := json.Marshal(request)
  266. // if err != nil {
  267. // log.Errorf("heartbeat marshal request error:%s.", err)
  268. // return response.HB_Config_Detail, err
  269. // }
  270. //
  271. // log.Infof("heartbeat request:%s.", string(data))
  272. // result, err := w.requestServer(heartModel, data)
  273. // if err != nil {
  274. // return response.HB_Config_Detail, err
  275. // }
  276. //
  277. // err = json.Unmarshal(result, &response)
  278. // if err != nil {
  279. // log.Errorf("heartbeat unmarshal response error:%s.", err)
  280. // return response.HB_Config_Detail, err
  281. // }
  282. // return response.HB_Config_Detail, nil
  283. // }
  284. //
  285. // func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) {
  286. // defer func() {
  287. // if err := recover(); err != nil {
  288. // log.Errorf("get task panic, error is <%v>", err)
  289. // }
  290. // }()
  291. // data, err := json.Marshal(request)
  292. // if err != nil {
  293. // log.Errorf("get task serialize get task request, error is <%s>", err)
  294. // return nil, err
  295. // }
  296. //
  297. // log.Infof("get task request:%s.", string(data))
  298. //
  299. // result := RespJson{}
  300. // // index := w.getIndex()
  301. // // for i := 0; i <= 0; i++ {
  302. // url := w.taskUrl
  303. // // fmt.Println("ReceiveTask:", url)
  304. // response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data,
  305. // map[string]string{"token": w.token, "Content-Type": "application/json"})
  306. // // response, err := HTTPRequest(http.MethodPost, url, data, w.token)
  307. // if err != nil {
  308. // log.WithError(err).Errorf("proxy server error")
  309. // return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err)
  310. // }
  311. // log.Infof("url is < %s >, tasks response <%s>", url, string(response))
  312. //
  313. // err = json.Unmarshal(response, &result)
  314. // if err != nil {
  315. // log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  316. // url, err, string(response))
  317. // return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  318. // url, err, string(response))
  319. //
  320. // }
  321. //
  322. // if result.Code != 100000 {
  323. // log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  324. // url, result.Code, result.Msg)
  325. // return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  326. // url, result.Code, result.Msg)
  327. // }
  328. //
  329. // if err != nil {
  330. // log.Errorf("failed with request task, error is <%v>", err)
  331. // return nil, err
  332. // }
  333. //
  334. // if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil {
  335. // base64Data := bytes.TrimSuffix(result.Data, []byte("\""))
  336. // base64Data = bytes.TrimPrefix(base64Data, []byte("\""))
  337. //
  338. // decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data))
  339. // plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2)
  340. // if err != nil {
  341. // log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err)
  342. // return nil, err
  343. // }
  344. //
  345. // log.Infof("task info is <%s>", string(plainText))
  346. // err = json.Unmarshal(plainText, &tasks)
  347. // if err != nil {
  348. // log.Errorf("failed with json unmarshal, task info is <%s>, error is <%v>", string(plainText), err)
  349. // return nil, err
  350. // }
  351. //
  352. // return tasks, nil
  353. // }
  354. //
  355. // err = json.Unmarshal(result.Data, &tasks)
  356. // if err != nil {
  357. // log.Errorf("failed with json unmarshal, error is <%v>", err)
  358. // return nil, err
  359. // }
  360. // return tasks, nil
  361. // }
  362. //
  363. // func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) {
  364. // data, err := json.Marshal(taskResult)
  365. // if err != nil {
  366. // log.Errorf("report task result serialize error:%s.", err)
  367. // return nil, err
  368. // }
  369. // log.Infof("report task result request:%s.", string(data))
  370. //
  371. // data, err = w.requestServer(reportTaskResultModel, data)
  372. // return data, err
  373. // }
  374. //
  375. // func (w *serverHTTPWorker) getIndex() int {
  376. // w.mux.Lock()
  377. // defer w.mux.Unlock()
  378. // return w.currentIndex
  379. // }
  380. func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) {
  381. uri = w.prefix + uri
  382. byteData, err := json.Marshal(data)
  383. if err != nil {
  384. log.WithError(err).Errorf("[requestServer] marshal request error.")
  385. return byteData, err
  386. }
  387. response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData,
  388. map[string]string{"Content-Type": "application/json"})
  389. if err != nil {
  390. log.WithError(err).Errorf("[requestServer] proxy server error")
  391. // index = w.updateUrl(index)
  392. return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err)
  393. }
  394. log.Infof("url %s response %s", uri, string(response))
  395. result := RespJson{}
  396. err = json.Unmarshal(response, &result)
  397. if err != nil {
  398. log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response))
  399. // index = w.updateUrl(index)
  400. return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err,
  401. string(response))
  402. }
  403. if result.Code != 1000 {
  404. // index = w.updateUrl(index)
  405. log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg)
  406. return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code,
  407. result.Msg)
  408. }
  409. return result.Data, nil
  410. }
  411. //
  412. //func (w *serverHTTPWorker) SetToken(token string) {
  413. // w.token = token
  414. //}
  415. //
  416. //func (w *serverHTTPWorker) GetInfo() (string, string) {
  417. // return w.doccServer, w.token
  418. //}