serverWorker.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. package worker
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/coroot/coroot-node-agent/utils"
  6. . "github.com/coroot/coroot-node-agent/utils/modelse"
  7. log "github.com/sirupsen/logrus"
  8. "net/http"
  9. "sync"
  10. //
  11. //log "github.com/sirupsen/logrus"
  12. )
  13. const (
  14. registerModel = 0
  15. connectTestModel = iota
  16. syncAgentInfoModel
  17. heartModel
  18. receiveTaskModel
  19. reportTaskResultModel
  20. installReportModel
  21. urlModel = "%s/api/ext/gaia/daemon/%s"
  22. urlModel_ = "%s/api/ext/gaia/daemon/%s/%s"
  23. )
  24. type ServerWorker interface {
  25. //InstallReport(r ReportRequest) error
  26. RegisterHost(RegisterHostReq) error
  27. RegisterApp(RegisterAppReq) error
  28. WhiteList(WhiteListReq) (WhiteData, error)
  29. //SyncAgentInfo(RegistRequest) ([]AgentList, error)
  30. //Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
  31. //ReceiveTask([]interface{}) ([]Task, error)
  32. //ReportTaskResult([]TaskResult) ([]byte, error)
  33. //SetToken(token string)
  34. //GetInfo() (string, string)
  35. }
  36. func (w *ServerHTTPWorker) RegisterHost(request RegisterHostReq) error {
  37. log.Infof("[server register host] request:%v.", utils.ToString(request))
  38. result, err := w.requestServer("/v2/app/registerHost", request)
  39. log.Infof("[server register host] resp:%v.", string(result))
  40. if err != nil {
  41. log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
  42. return err
  43. }
  44. return nil
  45. }
  46. func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error {
  47. log.Infof("[server register app] request:%v.", utils.ToString(request))
  48. result, err := w.requestServer("/v2/app/create", request)
  49. log.Infof("[server register app] resp data:%v.", string(result))
  50. if err != nil {
  51. log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
  52. return err
  53. }
  54. return nil
  55. }
  56. func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) {
  57. log.Infof("[server whitelist] request:%v.", request.String())
  58. response := WhiteData{}
  59. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  60. log.Infof("[server whitelist] resp data:%v.", string(result))
  61. if err != nil {
  62. return response, err
  63. }
  64. err = json.Unmarshal(result, &response)
  65. if err != nil {
  66. log.WithError(err).Errorf("[server whitelist] Failed RegisterApp request:%v.", request.String())
  67. return response, err
  68. }
  69. return response, nil
  70. }
  71. type ServerHTTPWorker struct {
  72. currentIndex int // 标识当前使用的是那个链接方式
  73. countTotal int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮
  74. mux *sync.Mutex
  75. accountID int
  76. hostID string
  77. registUrl string
  78. installReport string
  79. heartUrl string
  80. taskUrl string
  81. returnTaskUrl string
  82. installAgentUrl string
  83. token string
  84. connectTestUrl string
  85. doccServer string
  86. daemonId string
  87. connectList []string
  88. tokenList []string
  89. //nodeInfo *NodeInfo
  90. proxyClient *Client
  91. }
  92. func NewServerHTTPWorker() (*ServerHTTPWorker, error) {
  93. s := &ServerHTTPWorker{
  94. accountID: 1,
  95. //daemonId: daemonId,
  96. //currentIndex: -1,
  97. //connectList: strings.Split(connect, ","),
  98. //tokenList: strings.Split(token, ","),
  99. mux: new(sync.Mutex),
  100. }
  101. s.proxyClient, _ = GetProxyClient()
  102. //s.nodeInfo,_ = node.GetNodeInfo()
  103. return s, nil
  104. }
  105. // func (w *serverHTTPWorker) updateUrl(index int) int {
  106. // w.mux.Lock()
  107. // defer w.mux.Unlock()
  108. // // if index != w.currentIndex {
  109. // // return w.currentIndex
  110. // // }
  111. // w.currentIndex = (w.currentIndex + 1) % len(w.connectList)
  112. // connect := ""
  113. // length := len(w.tokenList)
  114. // if length > 0 {
  115. // w.token = w.tokenList[0]
  116. // }
  117. //
  118. // // if !strings.HasSuffix(connect, "http") {
  119. // // connect = fmt.Sprintf("http://%s", connect)
  120. // // }
  121. //
  122. // w.registUrl = fmt.Sprintf(urlModel, connect, "regist")
  123. // w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat")
  124. // w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId)
  125. // w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId)
  126. // w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents")
  127. // w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest")
  128. // w.installReport = fmt.Sprintf(urlModel, connect, "installreport")
  129. // log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s, installReport: %s",
  130. // w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport)
  131. // return w.currentIndex
  132. // }
  133. //
  134. // func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) {
  135. // response := RegistorRespData{}
  136. // data, err := json.Marshal(r)
  137. // if err != nil {
  138. // log.Errorf("register marshal request error:%s.", err)
  139. // return response, err
  140. // }
  141. // log.Infof("register request %s", string(data))
  142. // result, err := w.requestServer(registerModel, data)
  143. // if err != nil {
  144. // return response, err
  145. // }
  146. //
  147. // err = json.Unmarshal(result, &response)
  148. // if err != nil {
  149. // log.Errorf("register unmarshal response error:%s.", err)
  150. // return response, err
  151. // }
  152. //
  153. // return response, nil
  154. // }
  155. //
  156. // func (w *serverHTTPWorker) InstallReport(r ReportRequest) error {
  157. // data, err := json.Marshal(r)
  158. // if err != nil {
  159. // log.Errorf("install report marshal request error:%s.", err)
  160. // return err
  161. // }
  162. // log.Infof("install report request %s", string(data))
  163. // if _, err := w.requestServer(installReportModel, data); err != nil {
  164. // return err
  165. // }
  166. // return nil
  167. // }
  168. //
  169. // func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error {
  170. // data, err := json.Marshal(r)
  171. // if err != nil {
  172. // log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err)
  173. // return err
  174. // }
  175. // log.Infof("ConnectTest request:%s.", string(data))
  176. // _, err = w.requestServer(connectTestModel, data)
  177. // return err
  178. // }
  179. //
  180. // func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) {
  181. // response := RegistorRespData{}
  182. // data, err := json.Marshal(r)
  183. // if err != nil {
  184. // log.Errorf("sync agent runner serialize error:%s.", err)
  185. // return response.Agents, err
  186. // }
  187. // log.Infof("sync agent runner request:%s.", string(data))
  188. // result, err := w.requestServer(syncAgentInfoModel, data)
  189. // if err != nil {
  190. // return response.Agents, err
  191. // }
  192. //
  193. // err = json.Unmarshal(result, &response)
  194. // if err != nil {
  195. // log.Errorf("sync agent runner parser error return:%s.", err)
  196. // return response.Agents, err
  197. // }
  198. //
  199. // return response.Agents, nil
  200. // }
  201. //
  202. // func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) {
  203. // log.Infof("heartbeat request:%v.", request)
  204. // response := HB_Config{}
  205. // // Mac本上: P_resource_occupancy[0].cpu -> +Inf -> json: unsupported value: +Inf
  206. // data, err := json.Marshal(request)
  207. // if err != nil {
  208. // log.Errorf("heartbeat marshal request error:%s.", err)
  209. // return response.HB_Config_Detail, err
  210. // }
  211. //
  212. // log.Infof("heartbeat request:%s.", string(data))
  213. // result, err := w.requestServer(heartModel, data)
  214. // if err != nil {
  215. // return response.HB_Config_Detail, err
  216. // }
  217. //
  218. // err = json.Unmarshal(result, &response)
  219. // if err != nil {
  220. // log.Errorf("heartbeat unmarshal response error:%s.", err)
  221. // return response.HB_Config_Detail, err
  222. // }
  223. // return response.HB_Config_Detail, nil
  224. // }
  225. //
  226. // func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) {
  227. // defer func() {
  228. // if err := recover(); err != nil {
  229. // log.Errorf("get task panic, error is <%v>", err)
  230. // }
  231. // }()
  232. // data, err := json.Marshal(request)
  233. // if err != nil {
  234. // log.Errorf("get task serialize get task request, error is <%s>", err)
  235. // return nil, err
  236. // }
  237. //
  238. // log.Infof("get task request:%s.", string(data))
  239. //
  240. // result := RespJson{}
  241. // // index := w.getIndex()
  242. // // for i := 0; i <= 0; i++ {
  243. // url := w.taskUrl
  244. // // fmt.Println("ReceiveTask:", url)
  245. // response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data,
  246. // map[string]string{"token": w.token, "Content-Type": "application/json"})
  247. // // response, err := HTTPRequest(http.MethodPost, url, data, w.token)
  248. // if err != nil {
  249. // log.WithError(err).Errorf("proxy server error")
  250. // return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err)
  251. // }
  252. // log.Infof("url is < %s >, tasks response <%s>", url, string(response))
  253. //
  254. // err = json.Unmarshal(response, &result)
  255. // if err != nil {
  256. // log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  257. // url, err, string(response))
  258. // return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  259. // url, err, string(response))
  260. //
  261. // }
  262. //
  263. // if result.Code != 100000 {
  264. // log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  265. // url, result.Code, result.Msg)
  266. // return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  267. // url, result.Code, result.Msg)
  268. // }
  269. //
  270. // if err != nil {
  271. // log.Errorf("failed with request task, error is <%v>", err)
  272. // return nil, err
  273. // }
  274. //
  275. // if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil {
  276. // base64Data := bytes.TrimSuffix(result.Data, []byte("\""))
  277. // base64Data = bytes.TrimPrefix(base64Data, []byte("\""))
  278. //
  279. // decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data))
  280. // plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2)
  281. // if err != nil {
  282. // log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err)
  283. // return nil, err
  284. // }
  285. //
  286. // log.Infof("task info is <%s>", string(plainText))
  287. // err = json.Unmarshal(plainText, &tasks)
  288. // if err != nil {
  289. // log.Errorf("failed with json unmarshal, task info is <%s>, error is <%v>", string(plainText), err)
  290. // return nil, err
  291. // }
  292. //
  293. // return tasks, nil
  294. // }
  295. //
  296. // err = json.Unmarshal(result.Data, &tasks)
  297. // if err != nil {
  298. // log.Errorf("failed with json unmarshal, error is <%v>", err)
  299. // return nil, err
  300. // }
  301. // return tasks, nil
  302. // }
  303. //
  304. // func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) {
  305. // data, err := json.Marshal(taskResult)
  306. // if err != nil {
  307. // log.Errorf("report task result serialize error:%s.", err)
  308. // return nil, err
  309. // }
  310. // log.Infof("report task result request:%s.", string(data))
  311. //
  312. // data, err = w.requestServer(reportTaskResultModel, data)
  313. // return data, err
  314. // }
  315. //
  316. // func (w *serverHTTPWorker) getIndex() int {
  317. // w.mux.Lock()
  318. // defer w.mux.Unlock()
  319. // return w.currentIndex
  320. // }
  321. func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) {
  322. byteData, err := json.Marshal(data)
  323. if err != nil {
  324. log.WithError(err).Errorf("[requestServer] marshal request error.")
  325. return byteData, err
  326. }
  327. response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData,
  328. map[string]string{"Content-Type": "application/json"})
  329. if err != nil {
  330. log.WithError(err).Errorf("[requestServer] proxy server error")
  331. // index = w.updateUrl(index)
  332. return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err)
  333. }
  334. log.Infof("url %s response %s", uri, string(response))
  335. result := RespJson{}
  336. err = json.Unmarshal(response, &result)
  337. if err != nil {
  338. log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response))
  339. // index = w.updateUrl(index)
  340. return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err,
  341. string(response))
  342. }
  343. if result.Code != 1000 {
  344. // index = w.updateUrl(index)
  345. log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg)
  346. return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code,
  347. result.Msg)
  348. }
  349. return result.Data, nil
  350. }
  351. //
  352. //func (w *serverHTTPWorker) SetToken(token string) {
  353. // w.token = token
  354. //}
  355. //
  356. //func (w *serverHTTPWorker) GetInfo() (string, string) {
  357. // return w.doccServer, w.token
  358. //}