serverWorker.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. package worker
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/coroot/coroot-node-agent/flags"
  6. "github.com/coroot/coroot-node-agent/utils"
  7. . "github.com/coroot/coroot-node-agent/utils/modelse"
  8. log "github.com/sirupsen/logrus"
  9. "net/http"
  10. "sync"
  11. //
  12. //log "github.com/sirupsen/logrus"
  13. )
  14. const (
  15. registerModel = 0
  16. connectTestModel = iota
  17. syncAgentInfoModel
  18. heartModel
  19. receiveTaskModel
  20. reportTaskResultModel
  21. installReportModel
  22. urlModel = "%s/api/ext/gaia/daemon/%s"
  23. urlModel_ = "%s/api/ext/gaia/daemon/%s/%s"
  24. )
  25. type ServerWorker interface {
  26. //InstallReport(r ReportRequest) error
  27. RegisterHost(RegisterHostReq) error
  28. RegisterApp(RegisterAppReq) error
  29. WhiteList(WhiteListReq) (WhiteData, error)
  30. WhiteListV2(WhiteListReq) (WhiteDataV2, error)
  31. PullAllAppInfo(EbpfAppReq) (EbpfAppResp, error)
  32. //SyncAgentInfo(RegistRequest) ([]AgentList, error)
  33. //Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
  34. //ReceiveTask([]interface{}) ([]Task, error)
  35. //ReportTaskResult([]TaskResult) ([]byte, error)
  36. //SetToken(token string)
  37. //GetInfo() (string, string)
  38. }
  39. func (w *ServerHTTPWorker) PullAllAppInfo(request EbpfAppReq) (EbpfAppResp, error) {
  40. log.Infof("[get all appinfo] request:%v.", utils.ToString(request))
  41. response := EbpfAppResp{}
  42. result, err := w.requestServer("/api/v70/ebpfAppInfo", request)
  43. log.Infof("[get all appinfo] resp data:%v.", string(result))
  44. if err != nil {
  45. return response, err
  46. }
  47. err = json.Unmarshal(result, &response)
  48. if err != nil {
  49. log.WithError(err).Errorf("[get all appinfo] Failed get request:%v.", utils.ToString(request))
  50. return response, err
  51. }
  52. return response, nil
  53. }
  54. func (w *ServerHTTPWorker) RegisterHost(request RegisterHostReq) error {
  55. log.Infof("[server register host] request:%v.", utils.ToString(request))
  56. result, err := w.requestServer("/v2/app/registerHost", request)
  57. log.Infof("[server register host] resp:%v.", string(result))
  58. if err != nil {
  59. log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
  60. return err
  61. }
  62. return nil
  63. }
  64. func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error {
  65. log.Infof("[server register app] request:%v.", utils.ToString(request))
  66. result, err := w.requestServer("/v2/app/create", request)
  67. log.Infof("[server register app] resp data:%v.", string(result))
  68. if err != nil {
  69. log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
  70. return err
  71. }
  72. return nil
  73. }
  74. func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) {
  75. log.Infof("[server whitelist] request:%v.", request.String())
  76. response := WhiteData{}
  77. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  78. log.Infof("[server whitelist] resp data:%v.", string(result))
  79. if err != nil {
  80. return response, err
  81. }
  82. err = json.Unmarshal(result, &response)
  83. if err != nil {
  84. log.WithError(err).Errorf("[server whitelist] Failed RegisterApp request:%v.", request.String())
  85. return response, err
  86. }
  87. return response, nil
  88. }
  89. func (w *ServerHTTPWorker) WhiteListV2(request WhiteListReq) (WhiteDataV2, error) {
  90. log.Infof("[server whitelist] request:%v.", request.String())
  91. response := WhiteDataV2{}
  92. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  93. log.Infof("[server whitelist] resp data:%v.", string(result))
  94. if err != nil {
  95. return response, err
  96. }
  97. err = json.Unmarshal(result, &response)
  98. if err != nil {
  99. log.WithError(err).Errorf("[server whitelist] Failed RegisterApp request:%v.", request.String())
  100. return response, err
  101. }
  102. return response, nil
  103. }
  104. type ServerHTTPWorker struct {
  105. currentIndex int // 标识当前使用的是那个链接方式
  106. countTotal int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮
  107. mux *sync.Mutex
  108. accountID int
  109. hostID string
  110. registUrl string
  111. installReport string
  112. heartUrl string
  113. taskUrl string
  114. returnTaskUrl string
  115. installAgentUrl string
  116. token string
  117. connectTestUrl string
  118. doccServer string
  119. daemonId string
  120. connectList []string
  121. tokenList []string
  122. //nodeInfo *NodeInfo
  123. prefix string
  124. proxyClient *Client
  125. }
  126. func NewServerHTTPWorker() (*ServerHTTPWorker, error) {
  127. s := &ServerHTTPWorker{
  128. accountID: 1,
  129. //daemonId: daemonId,
  130. //currentIndex: -1,
  131. //connectList: strings.Split(connect, ","),
  132. //tokenList: strings.Split(token, ","),
  133. mux: new(sync.Mutex),
  134. }
  135. s.prefix = *flags.ServerPrefix
  136. s.proxyClient, _ = GetProxyClient()
  137. //s.nodeInfo,_ = node.GetNodeInfo()
  138. return s, nil
  139. }
  140. // func (w *serverHTTPWorker) updateUrl(index int) int {
  141. // w.mux.Lock()
  142. // defer w.mux.Unlock()
  143. // // if index != w.currentIndex {
  144. // // return w.currentIndex
  145. // // }
  146. // w.currentIndex = (w.currentIndex + 1) % len(w.connectList)
  147. // connect := ""
  148. // length := len(w.tokenList)
  149. // if length > 0 {
  150. // w.token = w.tokenList[0]
  151. // }
  152. //
  153. // // if !strings.HasSuffix(connect, "http") {
  154. // // connect = fmt.Sprintf("http://%s", connect)
  155. // // }
  156. //
  157. // w.registUrl = fmt.Sprintf(urlModel, connect, "regist")
  158. // w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat")
  159. // w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId)
  160. // w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId)
  161. // w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents")
  162. // w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest")
  163. // w.installReport = fmt.Sprintf(urlModel, connect, "installreport")
  164. // log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s, installReport: %s",
  165. // w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport)
  166. // return w.currentIndex
  167. // }
  168. //
  169. // func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) {
  170. // response := RegistorRespData{}
  171. // data, err := json.Marshal(r)
  172. // if err != nil {
  173. // log.Errorf("register marshal request error:%s.", err)
  174. // return response, err
  175. // }
  176. // log.Infof("register request %s", string(data))
  177. // result, err := w.requestServer(registerModel, data)
  178. // if err != nil {
  179. // return response, err
  180. // }
  181. //
  182. // err = json.Unmarshal(result, &response)
  183. // if err != nil {
  184. // log.Errorf("register unmarshal response error:%s.", err)
  185. // return response, err
  186. // }
  187. //
  188. // return response, nil
  189. // }
  190. //
  191. // func (w *serverHTTPWorker) InstallReport(r ReportRequest) error {
  192. // data, err := json.Marshal(r)
  193. // if err != nil {
  194. // log.Errorf("install report marshal request error:%s.", err)
  195. // return err
  196. // }
  197. // log.Infof("install report request %s", string(data))
  198. // if _, err := w.requestServer(installReportModel, data); err != nil {
  199. // return err
  200. // }
  201. // return nil
  202. // }
  203. //
  204. // func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error {
  205. // data, err := json.Marshal(r)
  206. // if err != nil {
  207. // log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err)
  208. // return err
  209. // }
  210. // log.Infof("ConnectTest request:%s.", string(data))
  211. // _, err = w.requestServer(connectTestModel, data)
  212. // return err
  213. // }
  214. //
  215. // func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) {
  216. // response := RegistorRespData{}
  217. // data, err := json.Marshal(r)
  218. // if err != nil {
  219. // log.Errorf("sync agent runner serialize error:%s.", err)
  220. // return response.Agents, err
  221. // }
  222. // log.Infof("sync agent runner request:%s.", string(data))
  223. // result, err := w.requestServer(syncAgentInfoModel, data)
  224. // if err != nil {
  225. // return response.Agents, err
  226. // }
  227. //
  228. // err = json.Unmarshal(result, &response)
  229. // if err != nil {
  230. // log.Errorf("sync agent runner parser error return:%s.", err)
  231. // return response.Agents, err
  232. // }
  233. //
  234. // return response.Agents, nil
  235. // }
  236. //
  237. // func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) {
  238. // log.Infof("heartbeat request:%v.", request)
  239. // response := HB_Config{}
  240. // // Mac本上: P_resource_occupancy[0].cpu -> +Inf -> json: unsupported value: +Inf
  241. // data, err := json.Marshal(request)
  242. // if err != nil {
  243. // log.Errorf("heartbeat marshal request error:%s.", err)
  244. // return response.HB_Config_Detail, err
  245. // }
  246. //
  247. // log.Infof("heartbeat request:%s.", string(data))
  248. // result, err := w.requestServer(heartModel, data)
  249. // if err != nil {
  250. // return response.HB_Config_Detail, err
  251. // }
  252. //
  253. // err = json.Unmarshal(result, &response)
  254. // if err != nil {
  255. // log.Errorf("heartbeat unmarshal response error:%s.", err)
  256. // return response.HB_Config_Detail, err
  257. // }
  258. // return response.HB_Config_Detail, nil
  259. // }
  260. //
  261. // func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) {
  262. // defer func() {
  263. // if err := recover(); err != nil {
  264. // log.Errorf("get task panic, error is <%v>", err)
  265. // }
  266. // }()
  267. // data, err := json.Marshal(request)
  268. // if err != nil {
  269. // log.Errorf("get task serialize get task request, error is <%s>", err)
  270. // return nil, err
  271. // }
  272. //
  273. // log.Infof("get task request:%s.", string(data))
  274. //
  275. // result := RespJson{}
  276. // // index := w.getIndex()
  277. // // for i := 0; i <= 0; i++ {
  278. // url := w.taskUrl
  279. // // fmt.Println("ReceiveTask:", url)
  280. // response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data,
  281. // map[string]string{"token": w.token, "Content-Type": "application/json"})
  282. // // response, err := HTTPRequest(http.MethodPost, url, data, w.token)
  283. // if err != nil {
  284. // log.WithError(err).Errorf("proxy server error")
  285. // return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err)
  286. // }
  287. // log.Infof("url is < %s >, tasks response <%s>", url, string(response))
  288. //
  289. // err = json.Unmarshal(response, &result)
  290. // if err != nil {
  291. // log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  292. // url, err, string(response))
  293. // return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  294. // url, err, string(response))
  295. //
  296. // }
  297. //
  298. // if result.Code != 100000 {
  299. // log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  300. // url, result.Code, result.Msg)
  301. // return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  302. // url, result.Code, result.Msg)
  303. // }
  304. //
  305. // if err != nil {
  306. // log.Errorf("failed with request task, error is <%v>", err)
  307. // return nil, err
  308. // }
  309. //
  310. // if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil {
  311. // base64Data := bytes.TrimSuffix(result.Data, []byte("\""))
  312. // base64Data = bytes.TrimPrefix(base64Data, []byte("\""))
  313. //
  314. // decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data))
  315. // plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2)
  316. // if err != nil {
  317. // log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err)
  318. // return nil, err
  319. // }
  320. //
  321. // log.Infof("task info is <%s>", string(plainText))
  322. // err = json.Unmarshal(plainText, &tasks)
  323. // if err != nil {
  324. // log.Errorf("failed with json unmarshal, task info is <%s>, error is <%v>", string(plainText), err)
  325. // return nil, err
  326. // }
  327. //
  328. // return tasks, nil
  329. // }
  330. //
  331. // err = json.Unmarshal(result.Data, &tasks)
  332. // if err != nil {
  333. // log.Errorf("failed with json unmarshal, error is <%v>", err)
  334. // return nil, err
  335. // }
  336. // return tasks, nil
  337. // }
  338. //
  339. // func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) {
  340. // data, err := json.Marshal(taskResult)
  341. // if err != nil {
  342. // log.Errorf("report task result serialize error:%s.", err)
  343. // return nil, err
  344. // }
  345. // log.Infof("report task result request:%s.", string(data))
  346. //
  347. // data, err = w.requestServer(reportTaskResultModel, data)
  348. // return data, err
  349. // }
  350. //
  351. // func (w *serverHTTPWorker) getIndex() int {
  352. // w.mux.Lock()
  353. // defer w.mux.Unlock()
  354. // return w.currentIndex
  355. // }
  356. func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) {
  357. uri = w.prefix + uri
  358. byteData, err := json.Marshal(data)
  359. if err != nil {
  360. log.WithError(err).Errorf("[requestServer] marshal request error.")
  361. return byteData, err
  362. }
  363. response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData,
  364. map[string]string{"Content-Type": "application/json"})
  365. if err != nil {
  366. log.WithError(err).Errorf("[requestServer] proxy server error")
  367. // index = w.updateUrl(index)
  368. return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err)
  369. }
  370. log.Infof("url %s response %s", uri, string(response))
  371. result := RespJson{}
  372. err = json.Unmarshal(response, &result)
  373. if err != nil {
  374. log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response))
  375. // index = w.updateUrl(index)
  376. return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err,
  377. string(response))
  378. }
  379. if result.Code != 1000 {
  380. // index = w.updateUrl(index)
  381. log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg)
  382. return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code,
  383. result.Msg)
  384. }
  385. return result.Data, nil
  386. }
  387. //
  388. //func (w *serverHTTPWorker) SetToken(token string) {
  389. // w.token = token
  390. //}
  391. //
  392. //func (w *serverHTTPWorker) GetInfo() (string, string) {
  393. // return w.doccServer, w.token
  394. //}