serverWorker.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. package worker
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/coroot/coroot-node-agent/flags"
  6. "github.com/coroot/coroot-node-agent/utils"
  7. . "github.com/coroot/coroot-node-agent/utils/modelse"
  8. log "github.com/sirupsen/logrus"
  9. "net/http"
  10. "sync"
  11. //
  12. //log "github.com/sirupsen/logrus"
  13. )
  14. const (
  15. registerModel = 0
  16. connectTestModel = iota
  17. syncAgentInfoModel
  18. heartModel
  19. receiveTaskModel
  20. reportTaskResultModel
  21. installReportModel
  22. urlModel = "%s/api/ext/gaia/daemon/%s"
  23. urlModel_ = "%s/api/ext/gaia/daemon/%s/%s"
  24. )
  25. type ServerWorker interface {
  26. //InstallReport(r ReportRequest) error
  27. RegisterHost(RegisterHostReq) error
  28. RegisterApp(RegisterAppReq) error
  29. WhiteList(WhiteListReq) (WhiteData, error)
  30. WhiteListV2(WhiteListReq) (WhiteDataV2, error)
  31. //SyncAgentInfo(RegistRequest) ([]AgentList, error)
  32. //Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
  33. //ReceiveTask([]interface{}) ([]Task, error)
  34. //ReportTaskResult([]TaskResult) ([]byte, error)
  35. //SetToken(token string)
  36. //GetInfo() (string, string)
  37. }
  38. func (w *ServerHTTPWorker) RegisterHost(request RegisterHostReq) error {
  39. log.Infof("[server register host] request:%v.", utils.ToString(request))
  40. result, err := w.requestServer("/v2/app/registerHost", request)
  41. log.Infof("[server register host] resp:%v.", string(result))
  42. if err != nil {
  43. log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
  44. return err
  45. }
  46. return nil
  47. }
  48. func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error {
  49. log.Infof("[server register app] request:%v.", utils.ToString(request))
  50. result, err := w.requestServer("/v2/app/create", request)
  51. log.Infof("[server register app] resp data:%v.", string(result))
  52. if err != nil {
  53. log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
  54. return err
  55. }
  56. return nil
  57. }
  58. func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) {
  59. log.Infof("[server whitelist] request:%v.", request.String())
  60. response := WhiteData{}
  61. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  62. log.Infof("[server whitelist] resp data:%v.", string(result))
  63. if err != nil {
  64. return response, err
  65. }
  66. err = json.Unmarshal(result, &response)
  67. if err != nil {
  68. log.WithError(err).Errorf("[server whitelist] Failed RegisterApp request:%v.", request.String())
  69. return response, err
  70. }
  71. return response, nil
  72. }
  73. func (w *ServerHTTPWorker) WhiteListV2(request WhiteListReq) (WhiteDataV2, error) {
  74. log.Infof("[server whitelist] request:%v.", request.String())
  75. response := WhiteDataV2{}
  76. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  77. log.Infof("[server whitelist] resp data:%v.", string(result))
  78. if err != nil {
  79. return response, err
  80. }
  81. err = json.Unmarshal(result, &response)
  82. if err != nil {
  83. log.WithError(err).Errorf("[server whitelist] Failed RegisterApp request:%v.", request.String())
  84. return response, err
  85. }
  86. return response, nil
  87. }
  88. type ServerHTTPWorker struct {
  89. currentIndex int // 标识当前使用的是那个链接方式
  90. countTotal int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮
  91. mux *sync.Mutex
  92. accountID int
  93. hostID string
  94. registUrl string
  95. installReport string
  96. heartUrl string
  97. taskUrl string
  98. returnTaskUrl string
  99. installAgentUrl string
  100. token string
  101. connectTestUrl string
  102. doccServer string
  103. daemonId string
  104. connectList []string
  105. tokenList []string
  106. //nodeInfo *NodeInfo
  107. prefix string
  108. proxyClient *Client
  109. }
  110. func NewServerHTTPWorker() (*ServerHTTPWorker, error) {
  111. s := &ServerHTTPWorker{
  112. accountID: 1,
  113. //daemonId: daemonId,
  114. //currentIndex: -1,
  115. //connectList: strings.Split(connect, ","),
  116. //tokenList: strings.Split(token, ","),
  117. mux: new(sync.Mutex),
  118. }
  119. s.prefix = *flags.ServerPrefix
  120. s.proxyClient, _ = GetProxyClient()
  121. //s.nodeInfo,_ = node.GetNodeInfo()
  122. return s, nil
  123. }
  124. // func (w *serverHTTPWorker) updateUrl(index int) int {
  125. // w.mux.Lock()
  126. // defer w.mux.Unlock()
  127. // // if index != w.currentIndex {
  128. // // return w.currentIndex
  129. // // }
  130. // w.currentIndex = (w.currentIndex + 1) % len(w.connectList)
  131. // connect := ""
  132. // length := len(w.tokenList)
  133. // if length > 0 {
  134. // w.token = w.tokenList[0]
  135. // }
  136. //
  137. // // if !strings.HasSuffix(connect, "http") {
  138. // // connect = fmt.Sprintf("http://%s", connect)
  139. // // }
  140. //
  141. // w.registUrl = fmt.Sprintf(urlModel, connect, "regist")
  142. // w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat")
  143. // w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId)
  144. // w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId)
  145. // w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents")
  146. // w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest")
  147. // w.installReport = fmt.Sprintf(urlModel, connect, "installreport")
  148. // log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s, installReport: %s",
  149. // w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport)
  150. // return w.currentIndex
  151. // }
  152. //
  153. // func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) {
  154. // response := RegistorRespData{}
  155. // data, err := json.Marshal(r)
  156. // if err != nil {
  157. // log.Errorf("register marshal request error:%s.", err)
  158. // return response, err
  159. // }
  160. // log.Infof("register request %s", string(data))
  161. // result, err := w.requestServer(registerModel, data)
  162. // if err != nil {
  163. // return response, err
  164. // }
  165. //
  166. // err = json.Unmarshal(result, &response)
  167. // if err != nil {
  168. // log.Errorf("register unmarshal response error:%s.", err)
  169. // return response, err
  170. // }
  171. //
  172. // return response, nil
  173. // }
  174. //
  175. // func (w *serverHTTPWorker) InstallReport(r ReportRequest) error {
  176. // data, err := json.Marshal(r)
  177. // if err != nil {
  178. // log.Errorf("install report marshal request error:%s.", err)
  179. // return err
  180. // }
  181. // log.Infof("install report request %s", string(data))
  182. // if _, err := w.requestServer(installReportModel, data); err != nil {
  183. // return err
  184. // }
  185. // return nil
  186. // }
  187. //
  188. // func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error {
  189. // data, err := json.Marshal(r)
  190. // if err != nil {
  191. // log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err)
  192. // return err
  193. // }
  194. // log.Infof("ConnectTest request:%s.", string(data))
  195. // _, err = w.requestServer(connectTestModel, data)
  196. // return err
  197. // }
  198. //
  199. // func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) {
  200. // response := RegistorRespData{}
  201. // data, err := json.Marshal(r)
  202. // if err != nil {
  203. // log.Errorf("sync agent runner serialize error:%s.", err)
  204. // return response.Agents, err
  205. // }
  206. // log.Infof("sync agent runner request:%s.", string(data))
  207. // result, err := w.requestServer(syncAgentInfoModel, data)
  208. // if err != nil {
  209. // return response.Agents, err
  210. // }
  211. //
  212. // err = json.Unmarshal(result, &response)
  213. // if err != nil {
  214. // log.Errorf("sync agent runner parser error return:%s.", err)
  215. // return response.Agents, err
  216. // }
  217. //
  218. // return response.Agents, nil
  219. // }
  220. //
  221. // func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) {
  222. // log.Infof("heartbeat request:%v.", request)
  223. // response := HB_Config{}
  224. // // Mac本上: P_resource_occupancy[0].cpu -> +Inf -> json: unsupported value: +Inf
  225. // data, err := json.Marshal(request)
  226. // if err != nil {
  227. // log.Errorf("heartbeat marshal request error:%s.", err)
  228. // return response.HB_Config_Detail, err
  229. // }
  230. //
  231. // log.Infof("heartbeat request:%s.", string(data))
  232. // result, err := w.requestServer(heartModel, data)
  233. // if err != nil {
  234. // return response.HB_Config_Detail, err
  235. // }
  236. //
  237. // err = json.Unmarshal(result, &response)
  238. // if err != nil {
  239. // log.Errorf("heartbeat unmarshal response error:%s.", err)
  240. // return response.HB_Config_Detail, err
  241. // }
  242. // return response.HB_Config_Detail, nil
  243. // }
  244. //
  245. // func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) {
  246. // defer func() {
  247. // if err := recover(); err != nil {
  248. // log.Errorf("get task panic, error is <%v>", err)
  249. // }
  250. // }()
  251. // data, err := json.Marshal(request)
  252. // if err != nil {
  253. // log.Errorf("get task serialize get task request, error is <%s>", err)
  254. // return nil, err
  255. // }
  256. //
  257. // log.Infof("get task request:%s.", string(data))
  258. //
  259. // result := RespJson{}
  260. // // index := w.getIndex()
  261. // // for i := 0; i <= 0; i++ {
  262. // url := w.taskUrl
  263. // // fmt.Println("ReceiveTask:", url)
  264. // response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data,
  265. // map[string]string{"token": w.token, "Content-Type": "application/json"})
  266. // // response, err := HTTPRequest(http.MethodPost, url, data, w.token)
  267. // if err != nil {
  268. // log.WithError(err).Errorf("proxy server error")
  269. // return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err)
  270. // }
  271. // log.Infof("url is < %s >, tasks response <%s>", url, string(response))
  272. //
  273. // err = json.Unmarshal(response, &result)
  274. // if err != nil {
  275. // log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  276. // url, err, string(response))
  277. // return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  278. // url, err, string(response))
  279. //
  280. // }
  281. //
  282. // if result.Code != 100000 {
  283. // log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  284. // url, result.Code, result.Msg)
  285. // return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  286. // url, result.Code, result.Msg)
  287. // }
  288. //
  289. // if err != nil {
  290. // log.Errorf("failed with request task, error is <%v>", err)
  291. // return nil, err
  292. // }
  293. //
  294. // if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil {
  295. // base64Data := bytes.TrimSuffix(result.Data, []byte("\""))
  296. // base64Data = bytes.TrimPrefix(base64Data, []byte("\""))
  297. //
  298. // decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data))
  299. // plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2)
  300. // if err != nil {
  301. // log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err)
  302. // return nil, err
  303. // }
  304. //
  305. // log.Infof("task info is <%s>", string(plainText))
  306. // err = json.Unmarshal(plainText, &tasks)
  307. // if err != nil {
  308. // log.Errorf("failed with json unmarshal, task info is <%s>, error is <%v>", string(plainText), err)
  309. // return nil, err
  310. // }
  311. //
  312. // return tasks, nil
  313. // }
  314. //
  315. // err = json.Unmarshal(result.Data, &tasks)
  316. // if err != nil {
  317. // log.Errorf("failed with json unmarshal, error is <%v>", err)
  318. // return nil, err
  319. // }
  320. // return tasks, nil
  321. // }
  322. //
  323. // func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) {
  324. // data, err := json.Marshal(taskResult)
  325. // if err != nil {
  326. // log.Errorf("report task result serialize error:%s.", err)
  327. // return nil, err
  328. // }
  329. // log.Infof("report task result request:%s.", string(data))
  330. //
  331. // data, err = w.requestServer(reportTaskResultModel, data)
  332. // return data, err
  333. // }
  334. //
  335. // func (w *serverHTTPWorker) getIndex() int {
  336. // w.mux.Lock()
  337. // defer w.mux.Unlock()
  338. // return w.currentIndex
  339. // }
  340. func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) {
  341. uri = w.prefix + uri
  342. byteData, err := json.Marshal(data)
  343. if err != nil {
  344. log.WithError(err).Errorf("[requestServer] marshal request error.")
  345. return byteData, err
  346. }
  347. response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData,
  348. map[string]string{"Content-Type": "application/json"})
  349. if err != nil {
  350. log.WithError(err).Errorf("[requestServer] proxy server error")
  351. // index = w.updateUrl(index)
  352. return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err)
  353. }
  354. log.Infof("url %s response %s", uri, string(response))
  355. result := RespJson{}
  356. err = json.Unmarshal(response, &result)
  357. if err != nil {
  358. log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response))
  359. // index = w.updateUrl(index)
  360. return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err,
  361. string(response))
  362. }
  363. if result.Code != 1000 {
  364. // index = w.updateUrl(index)
  365. log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg)
  366. return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code,
  367. result.Msg)
  368. }
  369. return result.Data, nil
  370. }
  371. //
  372. //func (w *serverHTTPWorker) SetToken(token string) {
  373. // w.token = token
  374. //}
  375. //
  376. //func (w *serverHTTPWorker) GetInfo() (string, string) {
  377. // return w.doccServer, w.token
  378. //}