serverWorker.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. package worker
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/coroot/coroot-node-agent/flags"
  6. "github.com/coroot/coroot-node-agent/utils"
  7. . "github.com/coroot/coroot-node-agent/utils/modelse"
  8. log "github.com/sirupsen/logrus"
  9. "net/http"
  10. "sync"
  11. //
  12. //log "github.com/sirupsen/logrus"
  13. )
  14. const (
  15. registerModel = 0
  16. connectTestModel = iota
  17. syncAgentInfoModel
  18. heartModel
  19. receiveTaskModel
  20. reportTaskResultModel
  21. installReportModel
  22. urlModel = "%s/api/ext/gaia/daemon/%s"
  23. urlModel_ = "%s/api/ext/gaia/daemon/%s/%s"
  24. )
  25. type ServerWorker interface {
  26. //InstallReport(r ReportRequest) error
  27. RegisterHost(RegisterHostReq) error
  28. RegisterApp(RegisterAppReq) error
  29. WhiteList(WhiteListReq) (WhiteData, error)
  30. WhiteListV2(WhiteListReq) (WhiteDataV2, error)
  31. PullAllAppInfo(EbpfAppReq) (EbpfAppResp, error)
  32. GetCodeSetting(CodeSettingReq) (CodeSettingResp, error)
  33. //SyncAgentInfo(RegistRequest) ([]AgentList, error)
  34. //Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
  35. //ReceiveTask([]interface{}) ([]Task, error)
  36. //ReportTaskResult([]TaskResult) ([]byte, error)
  37. //SetToken(token string)
  38. //GetInfo() (string, string)
  39. }
  40. func (w *ServerHTTPWorker) GetCodeSetting(request CodeSettingReq) (CodeSettingResp, error) {
  41. //log.Infof("[get GetCodeSetting] request:%v.", utils.ToString(request))
  42. response := CodeSettingResp{}
  43. result, err := w.requestServer("/api/v70/agent/getCodeSetting", request)
  44. //log.Infof("[get GetCodeSetting] resp data:%v.", string(result))
  45. if err != nil {
  46. return response, err
  47. }
  48. err = json.Unmarshal(result, &response)
  49. if err != nil {
  50. log.WithError(err).Errorf("[get GetCodeSetting] Failed GetCodeSetting request:%v.", utils.ToString(request))
  51. return response, err
  52. }
  53. return response, nil
  54. }
  55. func (w *ServerHTTPWorker) PullAllAppInfo(request EbpfAppReq) (EbpfAppResp, error) {
  56. //log.Infof("[get all appinfo] request:%v.", utils.ToString(request))
  57. response := EbpfAppResp{}
  58. result, err := w.requestServer("/api/v70/ebpfAppInfo", request)
  59. //log.Infof("[get all appinfo] resp data:%v.", string(result))
  60. if err != nil {
  61. return response, err
  62. }
  63. err = json.Unmarshal(result, &response)
  64. if err != nil {
  65. log.WithError(err).Errorf("[get all appinfo] Failed PullAllAppInfo request:%v.", utils.ToString(request))
  66. return response, err
  67. }
  68. return response, nil
  69. }
  70. func (w *ServerHTTPWorker) RegisterHost(request RegisterHostReq) error {
  71. log.Infof("[server register host] request:%v.", utils.ToString(request))
  72. result, err := w.requestServer("/v2/app/registerHost", request)
  73. log.Infof("[server register host] resp:%v.", string(result))
  74. if err != nil {
  75. log.WithError(err).Errorf("[server register] Failed RegisterHost request:%v.", utils.ToString(request))
  76. return err
  77. }
  78. return nil
  79. }
  80. func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error {
  81. log.Infof("[server register app] request:%v.", utils.ToString(request))
  82. result, err := w.requestServer("/v2/app/create", request)
  83. log.Infof("[server register app] resp data:%v.", string(result))
  84. if err != nil {
  85. log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
  86. return err
  87. }
  88. return nil
  89. }
  90. func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) {
  91. log.Infof("[server whitelist] request:%v.", request.String())
  92. response := WhiteData{}
  93. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  94. log.Infof("[server whitelist] resp data:%v.", string(result))
  95. if err != nil {
  96. return response, err
  97. }
  98. err = json.Unmarshal(result, &response)
  99. if err != nil {
  100. log.WithError(err).Errorf("[server whitelist] Failed Pull WhiteList request:%v.", request.String())
  101. return response, err
  102. }
  103. return response, nil
  104. }
  105. func (w *ServerHTTPWorker) WhiteListV2(request WhiteListReq) (WhiteDataV2, error) {
  106. log.Infof("[server whitelist] request:%v.", request.String())
  107. response := WhiteDataV2{}
  108. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  109. log.Infof("[server whitelist] resp data:%v.", string(result))
  110. if err != nil {
  111. return response, err
  112. }
  113. err = json.Unmarshal(result, &response)
  114. if err != nil {
  115. log.WithError(err).Errorf("[server whitelist] Failed Pull WhiteListV2 request:%v.", request.String())
  116. return response, err
  117. }
  118. return response, nil
  119. }
  120. type ServerHTTPWorker struct {
  121. currentIndex int // 标识当前使用的是那个链接方式
  122. countTotal int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮
  123. mux *sync.Mutex
  124. accountID int
  125. hostID string
  126. registUrl string
  127. installReport string
  128. heartUrl string
  129. taskUrl string
  130. returnTaskUrl string
  131. installAgentUrl string
  132. token string
  133. connectTestUrl string
  134. doccServer string
  135. daemonId string
  136. connectList []string
  137. tokenList []string
  138. //nodeInfo *NodeInfo
  139. prefix string
  140. proxyClient *Client
  141. }
  142. func NewServerHTTPWorker() (*ServerHTTPWorker, error) {
  143. s := &ServerHTTPWorker{
  144. accountID: 1,
  145. //daemonId: daemonId,
  146. //currentIndex: -1,
  147. //connectList: strings.Split(connect, ","),
  148. //tokenList: strings.Split(token, ","),
  149. mux: new(sync.Mutex),
  150. }
  151. s.prefix = *flags.ServerPrefix
  152. s.proxyClient, _ = GetProxyClient()
  153. //s.nodeInfo,_ = node.GetNodeInfo()
  154. return s, nil
  155. }
  156. // func (w *serverHTTPWorker) updateUrl(index int) int {
  157. // w.mux.Lock()
  158. // defer w.mux.Unlock()
  159. // // if index != w.currentIndex {
  160. // // return w.currentIndex
  161. // // }
  162. // w.currentIndex = (w.currentIndex + 1) % len(w.connectList)
  163. // connect := ""
  164. // length := len(w.tokenList)
  165. // if length > 0 {
  166. // w.token = w.tokenList[0]
  167. // }
  168. //
  169. // // if !strings.HasSuffix(connect, "http") {
  170. // // connect = fmt.Sprintf("http://%s", connect)
  171. // // }
  172. //
  173. // w.registUrl = fmt.Sprintf(urlModel, connect, "regist")
  174. // w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat")
  175. // w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId)
  176. // w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId)
  177. // w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents")
  178. // w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest")
  179. // w.installReport = fmt.Sprintf(urlModel, connect, "installreport")
  180. // log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s, installReport: %s",
  181. // w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport)
  182. // return w.currentIndex
  183. // }
  184. //
  185. // func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) {
  186. // response := RegistorRespData{}
  187. // data, err := json.Marshal(r)
  188. // if err != nil {
  189. // log.Errorf("register marshal request error:%s.", err)
  190. // return response, err
  191. // }
  192. // log.Infof("register request %s", string(data))
  193. // result, err := w.requestServer(registerModel, data)
  194. // if err != nil {
  195. // return response, err
  196. // }
  197. //
  198. // err = json.Unmarshal(result, &response)
  199. // if err != nil {
  200. // log.Errorf("register unmarshal response error:%s.", err)
  201. // return response, err
  202. // }
  203. //
  204. // return response, nil
  205. // }
  206. //
  207. // func (w *serverHTTPWorker) InstallReport(r ReportRequest) error {
  208. // data, err := json.Marshal(r)
  209. // if err != nil {
  210. // log.Errorf("install report marshal request error:%s.", err)
  211. // return err
  212. // }
  213. // log.Infof("install report request %s", string(data))
  214. // if _, err := w.requestServer(installReportModel, data); err != nil {
  215. // return err
  216. // }
  217. // return nil
  218. // }
  219. //
  220. // func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error {
  221. // data, err := json.Marshal(r)
  222. // if err != nil {
  223. // log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err)
  224. // return err
  225. // }
  226. // log.Infof("ConnectTest request:%s.", string(data))
  227. // _, err = w.requestServer(connectTestModel, data)
  228. // return err
  229. // }
  230. //
  231. // func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) {
  232. // response := RegistorRespData{}
  233. // data, err := json.Marshal(r)
  234. // if err != nil {
  235. // log.Errorf("sync agent runner serialize error:%s.", err)
  236. // return response.Agents, err
  237. // }
  238. // log.Infof("sync agent runner request:%s.", string(data))
  239. // result, err := w.requestServer(syncAgentInfoModel, data)
  240. // if err != nil {
  241. // return response.Agents, err
  242. // }
  243. //
  244. // err = json.Unmarshal(result, &response)
  245. // if err != nil {
  246. // log.Errorf("sync agent runner parser error return:%s.", err)
  247. // return response.Agents, err
  248. // }
  249. //
  250. // return response.Agents, nil
  251. // }
  252. //
  253. // func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) {
  254. // log.Infof("heartbeat request:%v.", request)
  255. // response := HB_Config{}
  256. // // Mac本上: P_resource_occupancy[0].cpu -> +Inf -> json: unsupported value: +Inf
  257. // data, err := json.Marshal(request)
  258. // if err != nil {
  259. // log.Errorf("heartbeat marshal request error:%s.", err)
  260. // return response.HB_Config_Detail, err
  261. // }
  262. //
  263. // log.Infof("heartbeat request:%s.", string(data))
  264. // result, err := w.requestServer(heartModel, data)
  265. // if err != nil {
  266. // return response.HB_Config_Detail, err
  267. // }
  268. //
  269. // err = json.Unmarshal(result, &response)
  270. // if err != nil {
  271. // log.Errorf("heartbeat unmarshal response error:%s.", err)
  272. // return response.HB_Config_Detail, err
  273. // }
  274. // return response.HB_Config_Detail, nil
  275. // }
  276. //
  277. // func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) {
  278. // defer func() {
  279. // if err := recover(); err != nil {
  280. // log.Errorf("get task panic, error is <%v>", err)
  281. // }
  282. // }()
  283. // data, err := json.Marshal(request)
  284. // if err != nil {
  285. // log.Errorf("get task serialize get task request, error is <%s>", err)
  286. // return nil, err
  287. // }
  288. //
  289. // log.Infof("get task request:%s.", string(data))
  290. //
  291. // result := RespJson{}
  292. // // index := w.getIndex()
  293. // // for i := 0; i <= 0; i++ {
  294. // url := w.taskUrl
  295. // // fmt.Println("ReceiveTask:", url)
  296. // response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data,
  297. // map[string]string{"token": w.token, "Content-Type": "application/json"})
  298. // // response, err := HTTPRequest(http.MethodPost, url, data, w.token)
  299. // if err != nil {
  300. // log.WithError(err).Errorf("proxy server error")
  301. // return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err)
  302. // }
  303. // log.Infof("url is < %s >, tasks response <%s>", url, string(response))
  304. //
  305. // err = json.Unmarshal(response, &result)
  306. // if err != nil {
  307. // log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  308. // url, err, string(response))
  309. // return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  310. // url, err, string(response))
  311. //
  312. // }
  313. //
  314. // if result.Code != 100000 {
  315. // log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  316. // url, result.Code, result.Msg)
  317. // return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  318. // url, result.Code, result.Msg)
  319. // }
  320. //
  321. // if err != nil {
  322. // log.Errorf("failed with request task, error is <%v>", err)
  323. // return nil, err
  324. // }
  325. //
  326. // if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil {
  327. // base64Data := bytes.TrimSuffix(result.Data, []byte("\""))
  328. // base64Data = bytes.TrimPrefix(base64Data, []byte("\""))
  329. //
  330. // decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data))
  331. // plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2)
  332. // if err != nil {
  333. // log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err)
  334. // return nil, err
  335. // }
  336. //
  337. // log.Infof("task info is <%s>", string(plainText))
  338. // err = json.Unmarshal(plainText, &tasks)
  339. // if err != nil {
  340. // log.Errorf("failed with json unmarshal, task info is <%s>, error is <%v>", string(plainText), err)
  341. // return nil, err
  342. // }
  343. //
  344. // return tasks, nil
  345. // }
  346. //
  347. // err = json.Unmarshal(result.Data, &tasks)
  348. // if err != nil {
  349. // log.Errorf("failed with json unmarshal, error is <%v>", err)
  350. // return nil, err
  351. // }
  352. // return tasks, nil
  353. // }
  354. //
  355. // func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) {
  356. // data, err := json.Marshal(taskResult)
  357. // if err != nil {
  358. // log.Errorf("report task result serialize error:%s.", err)
  359. // return nil, err
  360. // }
  361. // log.Infof("report task result request:%s.", string(data))
  362. //
  363. // data, err = w.requestServer(reportTaskResultModel, data)
  364. // return data, err
  365. // }
  366. //
  367. // func (w *serverHTTPWorker) getIndex() int {
  368. // w.mux.Lock()
  369. // defer w.mux.Unlock()
  370. // return w.currentIndex
  371. // }
  372. func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) {
  373. uri = w.prefix + uri
  374. byteData, err := json.Marshal(data)
  375. if err != nil {
  376. log.WithError(err).Errorf("[requestServer] marshal request error.")
  377. return byteData, err
  378. }
  379. response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData,
  380. map[string]string{"Content-Type": "application/json"})
  381. if err != nil {
  382. log.WithError(err).Errorf("[requestServer] proxy server error")
  383. // index = w.updateUrl(index)
  384. return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err)
  385. }
  386. log.Infof("url %s response %s", uri, string(response))
  387. result := RespJson{}
  388. err = json.Unmarshal(response, &result)
  389. if err != nil {
  390. log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response))
  391. // index = w.updateUrl(index)
  392. return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err,
  393. string(response))
  394. }
  395. if result.Code != 1000 {
  396. // index = w.updateUrl(index)
  397. log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg)
  398. return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code,
  399. result.Msg)
  400. }
  401. return result.Data, nil
  402. }
  403. //
  404. //func (w *serverHTTPWorker) SetToken(token string) {
  405. // w.token = token
  406. //}
  407. //
  408. //func (w *serverHTTPWorker) GetInfo() (string, string) {
  409. // return w.doccServer, w.token
  410. //}