serverWorker.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. package worker
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/coroot/coroot-node-agent/flags"
  6. "github.com/coroot/coroot-node-agent/utils"
  7. . "github.com/coroot/coroot-node-agent/utils/modelse"
  8. log "github.com/sirupsen/logrus"
  9. "net/http"
  10. "sync"
  11. //
  12. //log "github.com/sirupsen/logrus"
  13. )
  14. const (
  15. registerModel = 0
  16. connectTestModel = iota
  17. syncAgentInfoModel
  18. heartModel
  19. receiveTaskModel
  20. reportTaskResultModel
  21. installReportModel
  22. urlModel = "%s/api/ext/gaia/daemon/%s"
  23. urlModel_ = "%s/api/ext/gaia/daemon/%s/%s"
  24. )
  25. type ServerWorker interface {
  26. //InstallReport(r ReportRequest) error
  27. RegisterHost(RegisterHostReq) error
  28. RegisterApp(RegisterAppReq) error
  29. WhiteList(WhiteListReq) (WhiteData, error)
  30. //SyncAgentInfo(RegistRequest) ([]AgentList, error)
  31. //Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
  32. //ReceiveTask([]interface{}) ([]Task, error)
  33. //ReportTaskResult([]TaskResult) ([]byte, error)
  34. //SetToken(token string)
  35. //GetInfo() (string, string)
  36. }
  37. func (w *ServerHTTPWorker) RegisterHost(request RegisterHostReq) error {
  38. log.Infof("[server register host] request:%v.", utils.ToString(request))
  39. result, err := w.requestServer("/v2/app/registerHost", request)
  40. log.Infof("[server register host] resp:%v.", string(result))
  41. if err != nil {
  42. log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
  43. return err
  44. }
  45. return nil
  46. }
  47. func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error {
  48. log.Infof("[server register app] request:%v.", utils.ToString(request))
  49. result, err := w.requestServer("/v2/app/create", request)
  50. log.Infof("[server register app] resp data:%v.", string(result))
  51. if err != nil {
  52. log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
  53. return err
  54. }
  55. return nil
  56. }
  57. func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) {
  58. log.Infof("[server whitelist] request:%v.", request.String())
  59. response := WhiteData{}
  60. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  61. log.Infof("[server whitelist] resp data:%v.", string(result))
  62. if err != nil {
  63. return response, err
  64. }
  65. err = json.Unmarshal(result, &response)
  66. if err != nil {
  67. log.WithError(err).Errorf("[server whitelist] Failed RegisterApp request:%v.", request.String())
  68. return response, err
  69. }
  70. return response, nil
  71. }
  72. type ServerHTTPWorker struct {
  73. currentIndex int // 标识当前使用的是那个链接方式
  74. countTotal int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮
  75. mux *sync.Mutex
  76. accountID int
  77. hostID string
  78. registUrl string
  79. installReport string
  80. heartUrl string
  81. taskUrl string
  82. returnTaskUrl string
  83. installAgentUrl string
  84. token string
  85. connectTestUrl string
  86. doccServer string
  87. daemonId string
  88. connectList []string
  89. tokenList []string
  90. //nodeInfo *NodeInfo
  91. prefix string
  92. proxyClient *Client
  93. }
  94. func NewServerHTTPWorker() (*ServerHTTPWorker, error) {
  95. s := &ServerHTTPWorker{
  96. accountID: 1,
  97. //daemonId: daemonId,
  98. //currentIndex: -1,
  99. //connectList: strings.Split(connect, ","),
  100. //tokenList: strings.Split(token, ","),
  101. mux: new(sync.Mutex),
  102. }
  103. s.prefix = *flags.ServerPrefix
  104. s.proxyClient, _ = GetProxyClient()
  105. //s.nodeInfo,_ = node.GetNodeInfo()
  106. return s, nil
  107. }
  108. // func (w *serverHTTPWorker) updateUrl(index int) int {
  109. // w.mux.Lock()
  110. // defer w.mux.Unlock()
  111. // // if index != w.currentIndex {
  112. // // return w.currentIndex
  113. // // }
  114. // w.currentIndex = (w.currentIndex + 1) % len(w.connectList)
  115. // connect := ""
  116. // length := len(w.tokenList)
  117. // if length > 0 {
  118. // w.token = w.tokenList[0]
  119. // }
  120. //
  121. // // if !strings.HasSuffix(connect, "http") {
  122. // // connect = fmt.Sprintf("http://%s", connect)
  123. // // }
  124. //
  125. // w.registUrl = fmt.Sprintf(urlModel, connect, "regist")
  126. // w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat")
  127. // w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId)
  128. // w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId)
  129. // w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents")
  130. // w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest")
  131. // w.installReport = fmt.Sprintf(urlModel, connect, "installreport")
  132. // log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s, installReport: %s",
  133. // w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport)
  134. // return w.currentIndex
  135. // }
  136. //
  137. // func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) {
  138. // response := RegistorRespData{}
  139. // data, err := json.Marshal(r)
  140. // if err != nil {
  141. // log.Errorf("register marshal request error:%s.", err)
  142. // return response, err
  143. // }
  144. // log.Infof("register request %s", string(data))
  145. // result, err := w.requestServer(registerModel, data)
  146. // if err != nil {
  147. // return response, err
  148. // }
  149. //
  150. // err = json.Unmarshal(result, &response)
  151. // if err != nil {
  152. // log.Errorf("register unmarshal response error:%s.", err)
  153. // return response, err
  154. // }
  155. //
  156. // return response, nil
  157. // }
  158. //
  159. // func (w *serverHTTPWorker) InstallReport(r ReportRequest) error {
  160. // data, err := json.Marshal(r)
  161. // if err != nil {
  162. // log.Errorf("install report marshal request error:%s.", err)
  163. // return err
  164. // }
  165. // log.Infof("install report request %s", string(data))
  166. // if _, err := w.requestServer(installReportModel, data); err != nil {
  167. // return err
  168. // }
  169. // return nil
  170. // }
  171. //
  172. // func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error {
  173. // data, err := json.Marshal(r)
  174. // if err != nil {
  175. // log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err)
  176. // return err
  177. // }
  178. // log.Infof("ConnectTest request:%s.", string(data))
  179. // _, err = w.requestServer(connectTestModel, data)
  180. // return err
  181. // }
  182. //
  183. // func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) {
  184. // response := RegistorRespData{}
  185. // data, err := json.Marshal(r)
  186. // if err != nil {
  187. // log.Errorf("sync agent runner serialize error:%s.", err)
  188. // return response.Agents, err
  189. // }
  190. // log.Infof("sync agent runner request:%s.", string(data))
  191. // result, err := w.requestServer(syncAgentInfoModel, data)
  192. // if err != nil {
  193. // return response.Agents, err
  194. // }
  195. //
  196. // err = json.Unmarshal(result, &response)
  197. // if err != nil {
  198. // log.Errorf("sync agent runner parser error return:%s.", err)
  199. // return response.Agents, err
  200. // }
  201. //
  202. // return response.Agents, nil
  203. // }
  204. //
  205. // func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) {
  206. // log.Infof("heartbeat request:%v.", request)
  207. // response := HB_Config{}
  208. // // Mac本上: P_resource_occupancy[0].cpu -> +Inf -> json: unsupported value: +Inf
  209. // data, err := json.Marshal(request)
  210. // if err != nil {
  211. // log.Errorf("heartbeat marshal request error:%s.", err)
  212. // return response.HB_Config_Detail, err
  213. // }
  214. //
  215. // log.Infof("heartbeat request:%s.", string(data))
  216. // result, err := w.requestServer(heartModel, data)
  217. // if err != nil {
  218. // return response.HB_Config_Detail, err
  219. // }
  220. //
  221. // err = json.Unmarshal(result, &response)
  222. // if err != nil {
  223. // log.Errorf("heartbeat unmarshal response error:%s.", err)
  224. // return response.HB_Config_Detail, err
  225. // }
  226. // return response.HB_Config_Detail, nil
  227. // }
  228. //
  229. // func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) {
  230. // defer func() {
  231. // if err := recover(); err != nil {
  232. // log.Errorf("get task panic, error is <%v>", err)
  233. // }
  234. // }()
  235. // data, err := json.Marshal(request)
  236. // if err != nil {
  237. // log.Errorf("get task serialize get task request, error is <%s>", err)
  238. // return nil, err
  239. // }
  240. //
  241. // log.Infof("get task request:%s.", string(data))
  242. //
  243. // result := RespJson{}
  244. // // index := w.getIndex()
  245. // // for i := 0; i <= 0; i++ {
  246. // url := w.taskUrl
  247. // // fmt.Println("ReceiveTask:", url)
  248. // response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data,
  249. // map[string]string{"token": w.token, "Content-Type": "application/json"})
  250. // // response, err := HTTPRequest(http.MethodPost, url, data, w.token)
  251. // if err != nil {
  252. // log.WithError(err).Errorf("proxy server error")
  253. // return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err)
  254. // }
  255. // log.Infof("url is < %s >, tasks response <%s>", url, string(response))
  256. //
  257. // err = json.Unmarshal(response, &result)
  258. // if err != nil {
  259. // log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  260. // url, err, string(response))
  261. // return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  262. // url, err, string(response))
  263. //
  264. // }
  265. //
  266. // if result.Code != 100000 {
  267. // log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  268. // url, result.Code, result.Msg)
  269. // return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  270. // url, result.Code, result.Msg)
  271. // }
  272. //
  273. // if err != nil {
  274. // log.Errorf("failed with request task, error is <%v>", err)
  275. // return nil, err
  276. // }
  277. //
  278. // if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil {
  279. // base64Data := bytes.TrimSuffix(result.Data, []byte("\""))
  280. // base64Data = bytes.TrimPrefix(base64Data, []byte("\""))
  281. //
  282. // decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data))
  283. // plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2)
  284. // if err != nil {
  285. // log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err)
  286. // return nil, err
  287. // }
  288. //
  289. // log.Infof("task info is <%s>", string(plainText))
  290. // err = json.Unmarshal(plainText, &tasks)
  291. // if err != nil {
  292. // log.Errorf("failed with json unmarshal, task info is <%s>, error is <%v>", string(plainText), err)
  293. // return nil, err
  294. // }
  295. //
  296. // return tasks, nil
  297. // }
  298. //
  299. // err = json.Unmarshal(result.Data, &tasks)
  300. // if err != nil {
  301. // log.Errorf("failed with json unmarshal, error is <%v>", err)
  302. // return nil, err
  303. // }
  304. // return tasks, nil
  305. // }
  306. //
  307. // func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) {
  308. // data, err := json.Marshal(taskResult)
  309. // if err != nil {
  310. // log.Errorf("report task result serialize error:%s.", err)
  311. // return nil, err
  312. // }
  313. // log.Infof("report task result request:%s.", string(data))
  314. //
  315. // data, err = w.requestServer(reportTaskResultModel, data)
  316. // return data, err
  317. // }
  318. //
  319. // func (w *serverHTTPWorker) getIndex() int {
  320. // w.mux.Lock()
  321. // defer w.mux.Unlock()
  322. // return w.currentIndex
  323. // }
  324. func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) {
  325. uri = w.prefix + uri
  326. byteData, err := json.Marshal(data)
  327. if err != nil {
  328. log.WithError(err).Errorf("[requestServer] marshal request error.")
  329. return byteData, err
  330. }
  331. response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData,
  332. map[string]string{"Content-Type": "application/json"})
  333. if err != nil {
  334. log.WithError(err).Errorf("[requestServer] proxy server error")
  335. // index = w.updateUrl(index)
  336. return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err)
  337. }
  338. log.Infof("url %s response %s", uri, string(response))
  339. result := RespJson{}
  340. err = json.Unmarshal(response, &result)
  341. if err != nil {
  342. log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response))
  343. // index = w.updateUrl(index)
  344. return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err,
  345. string(response))
  346. }
  347. if result.Code != 1000 {
  348. // index = w.updateUrl(index)
  349. log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg)
  350. return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code,
  351. result.Msg)
  352. }
  353. return result.Data, nil
  354. }
  355. //
  356. //func (w *serverHTTPWorker) SetToken(token string) {
  357. // w.token = token
  358. //}
  359. //
  360. //func (w *serverHTTPWorker) GetInfo() (string, string) {
  361. // return w.doccServer, w.token
  362. //}