serverWorker.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. package worker
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/coroot/coroot-node-agent/flags"
  6. "github.com/coroot/coroot-node-agent/utils"
  7. . "github.com/coroot/coroot-node-agent/utils/modelse"
  8. log "github.com/sirupsen/logrus"
  9. "net/http"
  10. "sync"
  11. //
  12. //log "github.com/sirupsen/logrus"
  13. )
  14. const (
  15. registerModel = 0
  16. connectTestModel = iota
  17. syncAgentInfoModel
  18. heartModel
  19. receiveTaskModel
  20. reportTaskResultModel
  21. installReportModel
  22. urlModel = "%s/api/ext/gaia/daemon/%s"
  23. urlModel_ = "%s/api/ext/gaia/daemon/%s/%s"
  24. )
  25. type ServerWorker interface {
  26. //InstallReport(r ReportRequest) error
  27. RegisterHost(RegisterHostReq) error
  28. RegisterApp(RegisterAppReq) error
  29. WhiteList(WhiteListReq) (WhiteData, error)
  30. WhiteListV2(WhiteListReq) (WhiteDataV2, error)
  31. PullAllAppInfo(EbpfAppReq) (EbpfAppResp, error)
  32. GetCodeSetting(CodeSettingReq) (CodeSettingResp, error)
  33. //SyncAgentInfo(RegistRequest) ([]AgentList, error)
  34. //Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
  35. //ReceiveTask([]interface{}) ([]Task, error)
  36. //ReportTaskResult([]TaskResult) ([]byte, error)
  37. //SetToken(token string)
  38. //GetInfo() (string, string)
  39. }
  40. func (w *ServerHTTPWorker) GetCodeSetting(request CodeSettingReq) (CodeSettingResp, error) {
  41. //log.Infof("[get GetCodeSetting] request:%v.", utils.ToString(request))
  42. response := CodeSettingResp{}
  43. result, err := w.requestServer("/api/v70/agent/getCodeSetting", request)
  44. //log.Infof("[get GetCodeSetting] resp data:%v.", string(result))
  45. if err != nil {
  46. return response, err
  47. }
  48. err = json.Unmarshal(result, &response)
  49. if err != nil {
  50. log.WithError(err).Errorf("[get GetCodeSetting] Failed GetCodeSetting request:%v.", utils.ToString(request))
  51. return response, err
  52. }
  53. return response, nil
  54. }
  55. func (w *ServerHTTPWorker) PullAllAppInfo(request EbpfAppReq) (EbpfAppResp, error) {
  56. //log.Infof("[get all appinfo] request:%v.", utils.ToString(request))
  57. response := EbpfAppResp{}
  58. result, err := w.requestServer("/api/v70/ebpfAppInfo", request)
  59. //log.Infof("[get all appinfo] resp data:%v.", string(result))
  60. if err != nil {
  61. return response, err
  62. }
  63. err = json.Unmarshal(result, &response)
  64. if err != nil {
  65. log.WithError(err).Errorf("[get all appinfo] Failed PullAllAppInfo request:%v.", utils.ToString(request))
  66. return response, err
  67. }
  68. return response, nil
  69. }
  70. func (w *ServerHTTPWorker) RegisterHost(request RegisterHostReq) error {
  71. //log.Infof("[server register host] request:%v.", utils.ToString(request))
  72. //result, err := w.requestServer("/v2/app/registerHost", request)
  73. _, err := w.requestServer("/v2/app/registerHost", request)
  74. //log.Infof("[server register host] resp:%v.", string(result))
  75. if err != nil {
  76. log.WithError(err).Errorf("[server register] Failed RegisterHost request:%v.", utils.ToString(request))
  77. return err
  78. }
  79. return nil
  80. }
  81. func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error {
  82. //log.Infof("[server register app] request:%v.", utils.ToString(request))
  83. //result, err := w.requestServer("/v2/app/create", request)
  84. _, err := w.requestServer("/v2/app/create", request)
  85. //log.Infof("[server register app] resp data:%v.", string(result))
  86. if err != nil {
  87. log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
  88. return err
  89. }
  90. return nil
  91. }
  92. func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) {
  93. //log.Infof("[server whitelist] request:%v.", request.String())
  94. response := WhiteData{}
  95. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  96. //log.Infof("[server whitelist] resp data:%v.", string(result))
  97. if err != nil {
  98. return response, err
  99. }
  100. err = json.Unmarshal(result, &response)
  101. if err != nil {
  102. log.WithError(err).Errorf("[server whitelist] Failed Pull WhiteList request:%v.", request.String())
  103. return response, err
  104. }
  105. return response, nil
  106. }
  107. func (w *ServerHTTPWorker) WhiteListV2(request WhiteListReq) (WhiteDataV2, error) {
  108. //log.Infof("[server whitelist] request:%v.", request.String())
  109. response := WhiteDataV2{}
  110. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  111. // log.Infof("[server whitelist] resp data:%v.", string(result))
  112. if err != nil {
  113. return response, err
  114. }
  115. err = json.Unmarshal(result, &response)
  116. if err != nil {
  117. log.WithError(err).Errorf("[server whitelist] Failed Pull WhiteListV2 request:%v.", request.String())
  118. return response, err
  119. }
  120. return response, nil
  121. }
  122. type ServerHTTPWorker struct {
  123. currentIndex int // 标识当前使用的是那个链接方式
  124. countTotal int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮
  125. mux *sync.Mutex
  126. accountID int
  127. hostID string
  128. registUrl string
  129. installReport string
  130. heartUrl string
  131. taskUrl string
  132. returnTaskUrl string
  133. installAgentUrl string
  134. token string
  135. connectTestUrl string
  136. doccServer string
  137. daemonId string
  138. connectList []string
  139. tokenList []string
  140. //nodeInfo *NodeInfo
  141. prefix string
  142. proxyClient *Client
  143. }
  144. func NewServerHTTPWorker() (*ServerHTTPWorker, error) {
  145. s := &ServerHTTPWorker{
  146. accountID: 1,
  147. //daemonId: daemonId,
  148. //currentIndex: -1,
  149. //connectList: strings.Split(connect, ","),
  150. //tokenList: strings.Split(token, ","),
  151. mux: new(sync.Mutex),
  152. }
  153. s.prefix = *flags.ServerPrefix
  154. s.proxyClient, _ = GetProxyClient()
  155. //s.nodeInfo,_ = node.GetNodeInfo()
  156. return s, nil
  157. }
  158. // func (w *serverHTTPWorker) updateUrl(index int) int {
  159. // w.mux.Lock()
  160. // defer w.mux.Unlock()
  161. // // if index != w.currentIndex {
  162. // // return w.currentIndex
  163. // // }
  164. // w.currentIndex = (w.currentIndex + 1) % len(w.connectList)
  165. // connect := ""
  166. // length := len(w.tokenList)
  167. // if length > 0 {
  168. // w.token = w.tokenList[0]
  169. // }
  170. //
  171. // // if !strings.HasSuffix(connect, "http") {
  172. // // connect = fmt.Sprintf("http://%s", connect)
  173. // // }
  174. //
  175. // w.registUrl = fmt.Sprintf(urlModel, connect, "regist")
  176. // w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat")
  177. // w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId)
  178. // w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId)
  179. // w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents")
  180. // w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest")
  181. // w.installReport = fmt.Sprintf(urlModel, connect, "installreport")
  182. // log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s, installReport: %s",
  183. // w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport)
  184. // return w.currentIndex
  185. // }
  186. //
  187. // func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) {
  188. // response := RegistorRespData{}
  189. // data, err := json.Marshal(r)
  190. // if err != nil {
  191. // log.Errorf("register marshal request error:%s.", err)
  192. // return response, err
  193. // }
  194. // log.Infof("register request %s", string(data))
  195. // result, err := w.requestServer(registerModel, data)
  196. // if err != nil {
  197. // return response, err
  198. // }
  199. //
  200. // err = json.Unmarshal(result, &response)
  201. // if err != nil {
  202. // log.Errorf("register unmarshal response error:%s.", err)
  203. // return response, err
  204. // }
  205. //
  206. // return response, nil
  207. // }
  208. //
  209. // func (w *serverHTTPWorker) InstallReport(r ReportRequest) error {
  210. // data, err := json.Marshal(r)
  211. // if err != nil {
  212. // log.Errorf("install report marshal request error:%s.", err)
  213. // return err
  214. // }
  215. // log.Infof("install report request %s", string(data))
  216. // if _, err := w.requestServer(installReportModel, data); err != nil {
  217. // return err
  218. // }
  219. // return nil
  220. // }
  221. //
  222. // func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error {
  223. // data, err := json.Marshal(r)
  224. // if err != nil {
  225. // log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err)
  226. // return err
  227. // }
  228. // log.Infof("ConnectTest request:%s.", string(data))
  229. // _, err = w.requestServer(connectTestModel, data)
  230. // return err
  231. // }
  232. //
  233. // func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) {
  234. // response := RegistorRespData{}
  235. // data, err := json.Marshal(r)
  236. // if err != nil {
  237. // log.Errorf("sync agent runner serialize error:%s.", err)
  238. // return response.Agents, err
  239. // }
  240. // log.Infof("sync agent runner request:%s.", string(data))
  241. // result, err := w.requestServer(syncAgentInfoModel, data)
  242. // if err != nil {
  243. // return response.Agents, err
  244. // }
  245. //
  246. // err = json.Unmarshal(result, &response)
  247. // if err != nil {
  248. // log.Errorf("sync agent runner parser error return:%s.", err)
  249. // return response.Agents, err
  250. // }
  251. //
  252. // return response.Agents, nil
  253. // }
  254. //
  255. // func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) {
  256. // log.Infof("heartbeat request:%v.", request)
  257. // response := HB_Config{}
  258. // // Mac本上: P_resource_occupancy[0].cpu -> +Inf -> json: unsupported value: +Inf
  259. // data, err := json.Marshal(request)
  260. // if err != nil {
  261. // log.Errorf("heartbeat marshal request error:%s.", err)
  262. // return response.HB_Config_Detail, err
  263. // }
  264. //
  265. // log.Infof("heartbeat request:%s.", string(data))
  266. // result, err := w.requestServer(heartModel, data)
  267. // if err != nil {
  268. // return response.HB_Config_Detail, err
  269. // }
  270. //
  271. // err = json.Unmarshal(result, &response)
  272. // if err != nil {
  273. // log.Errorf("heartbeat unmarshal response error:%s.", err)
  274. // return response.HB_Config_Detail, err
  275. // }
  276. // return response.HB_Config_Detail, nil
  277. // }
  278. //
  279. // func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) {
  280. // defer func() {
  281. // if err := recover(); err != nil {
  282. // log.Errorf("get task panic, error is <%v>", err)
  283. // }
  284. // }()
  285. // data, err := json.Marshal(request)
  286. // if err != nil {
  287. // log.Errorf("get task serialize get task request, error is <%s>", err)
  288. // return nil, err
  289. // }
  290. //
  291. // log.Infof("get task request:%s.", string(data))
  292. //
  293. // result := RespJson{}
  294. // // index := w.getIndex()
  295. // // for i := 0; i <= 0; i++ {
  296. // url := w.taskUrl
  297. // // fmt.Println("ReceiveTask:", url)
  298. // response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data,
  299. // map[string]string{"token": w.token, "Content-Type": "application/json"})
  300. // // response, err := HTTPRequest(http.MethodPost, url, data, w.token)
  301. // if err != nil {
  302. // log.WithError(err).Errorf("proxy server error")
  303. // return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err)
  304. // }
  305. // log.Infof("url is < %s >, tasks response <%s>", url, string(response))
  306. //
  307. // err = json.Unmarshal(response, &result)
  308. // if err != nil {
  309. // log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  310. // url, err, string(response))
  311. // return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  312. // url, err, string(response))
  313. //
  314. // }
  315. //
  316. // if result.Code != 100000 {
  317. // log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  318. // url, result.Code, result.Msg)
  319. // return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  320. // url, result.Code, result.Msg)
  321. // }
  322. //
  323. // if err != nil {
  324. // log.Errorf("failed with request task, error is <%v>", err)
  325. // return nil, err
  326. // }
  327. //
  328. // if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil {
  329. // base64Data := bytes.TrimSuffix(result.Data, []byte("\""))
  330. // base64Data = bytes.TrimPrefix(base64Data, []byte("\""))
  331. //
  332. // decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data))
  333. // plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2)
  334. // if err != nil {
  335. // log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err)
  336. // return nil, err
  337. // }
  338. //
  339. // log.Infof("task info is <%s>", string(plainText))
  340. // err = json.Unmarshal(plainText, &tasks)
  341. // if err != nil {
  342. // log.Errorf("failed with json unmarshal, task info is <%s>, error is <%v>", string(plainText), err)
  343. // return nil, err
  344. // }
  345. //
  346. // return tasks, nil
  347. // }
  348. //
  349. // err = json.Unmarshal(result.Data, &tasks)
  350. // if err != nil {
  351. // log.Errorf("failed with json unmarshal, error is <%v>", err)
  352. // return nil, err
  353. // }
  354. // return tasks, nil
  355. // }
  356. //
  357. // func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) {
  358. // data, err := json.Marshal(taskResult)
  359. // if err != nil {
  360. // log.Errorf("report task result serialize error:%s.", err)
  361. // return nil, err
  362. // }
  363. // log.Infof("report task result request:%s.", string(data))
  364. //
  365. // data, err = w.requestServer(reportTaskResultModel, data)
  366. // return data, err
  367. // }
  368. //
  369. // func (w *serverHTTPWorker) getIndex() int {
  370. // w.mux.Lock()
  371. // defer w.mux.Unlock()
  372. // return w.currentIndex
  373. // }
  374. func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) {
  375. uri = w.prefix + uri
  376. byteData, err := json.Marshal(data)
  377. if err != nil {
  378. log.WithError(err).Errorf("[requestServer] marshal request error.")
  379. return byteData, err
  380. }
  381. response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData,
  382. map[string]string{"Content-Type": "application/json"})
  383. if err != nil {
  384. log.WithError(err).Errorf("[requestServer] proxy server error")
  385. // index = w.updateUrl(index)
  386. return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err)
  387. }
  388. log.Infof("url %s response %s", uri, string(response))
  389. result := RespJson{}
  390. err = json.Unmarshal(response, &result)
  391. if err != nil {
  392. log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response))
  393. // index = w.updateUrl(index)
  394. return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err,
  395. string(response))
  396. }
  397. if result.Code != 1000 {
  398. // index = w.updateUrl(index)
  399. log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg)
  400. return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code,
  401. result.Msg)
  402. }
  403. return result.Data, nil
  404. }
  405. //
  406. //func (w *serverHTTPWorker) SetToken(token string) {
  407. // w.token = token
  408. //}
  409. //
  410. //func (w *serverHTTPWorker) GetInfo() (string, string) {
  411. // return w.doccServer, w.token
  412. //}