serverWorker.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. package worker
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. . "github.com/coroot/coroot-node-agent/utils/modelse"
  6. log "github.com/sirupsen/logrus"
  7. "net/http"
  8. "sync"
  9. //
  10. //log "github.com/sirupsen/logrus"
  11. )
  12. const (
  13. registerModel = 0
  14. connectTestModel = iota
  15. syncAgentInfoModel
  16. heartModel
  17. receiveTaskModel
  18. reportTaskResultModel
  19. installReportModel
  20. urlModel = "%s/api/ext/gaia/daemon/%s"
  21. urlModel_ = "%s/api/ext/gaia/daemon/%s/%s"
  22. )
  23. type ServerWorker interface {
  24. //InstallReport(r ReportRequest) error
  25. RegisterApp(RegisterAppReq) error
  26. WhiteList(WhiteListReq) (WhiteData, error)
  27. //SyncAgentInfo(RegistRequest) ([]AgentList, error)
  28. //Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
  29. //ReceiveTask([]interface{}) ([]Task, error)
  30. //ReportTaskResult([]TaskResult) ([]byte, error)
  31. //SetToken(token string)
  32. //GetInfo() (string, string)
  33. }
  34. func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error {
  35. log.Infof("Register App request:%v.", request.String())
  36. _, err := w.requestServer("/v2/app/create", request)
  37. if err != nil {
  38. log.WithError(err).Errorf("Failed RegisterApp request:%v.", request.String())
  39. return err
  40. }
  41. return nil
  42. }
  43. func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) {
  44. log.Infof("WhiteList request:%v.", request.String())
  45. response := WhiteData{}
  46. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  47. if err != nil {
  48. return response, err
  49. }
  50. err = json.Unmarshal(result, &response)
  51. if err != nil {
  52. log.WithError(err).Errorf("Failed RegisterApp request:%v.", request.String())
  53. return response, err
  54. }
  55. return response, nil
  56. }
  57. type ServerHTTPWorker struct {
  58. currentIndex int // 标识当前使用的是那个链接方式
  59. countTotal int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮
  60. mux *sync.Mutex
  61. accountID int
  62. hostID string
  63. registUrl string
  64. installReport string
  65. heartUrl string
  66. taskUrl string
  67. returnTaskUrl string
  68. installAgentUrl string
  69. token string
  70. connectTestUrl string
  71. doccServer string
  72. daemonId string
  73. connectList []string
  74. tokenList []string
  75. proxyClient *Client
  76. }
  77. func NewServerHTTPWorker() (*ServerHTTPWorker, error) {
  78. s := &ServerHTTPWorker{
  79. accountID: 1,
  80. //daemonId: daemonId,
  81. //currentIndex: -1,
  82. //connectList: strings.Split(connect, ","),
  83. //tokenList: strings.Split(token, ","),
  84. mux: new(sync.Mutex),
  85. }
  86. s.proxyClient, _ = GetProxyClient()
  87. return s, nil
  88. }
  89. // func (w *serverHTTPWorker) updateUrl(index int) int {
  90. // w.mux.Lock()
  91. // defer w.mux.Unlock()
  92. // // if index != w.currentIndex {
  93. // // return w.currentIndex
  94. // // }
  95. // w.currentIndex = (w.currentIndex + 1) % len(w.connectList)
  96. // connect := ""
  97. // length := len(w.tokenList)
  98. // if length > 0 {
  99. // w.token = w.tokenList[0]
  100. // }
  101. //
  102. // // if !strings.HasSuffix(connect, "http") {
  103. // // connect = fmt.Sprintf("http://%s", connect)
  104. // // }
  105. //
  106. // w.registUrl = fmt.Sprintf(urlModel, connect, "regist")
  107. // w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat")
  108. // w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId)
  109. // w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId)
  110. // w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents")
  111. // w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest")
  112. // w.installReport = fmt.Sprintf(urlModel, connect, "installreport")
  113. // log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s, installReport: %s",
  114. // w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport)
  115. // return w.currentIndex
  116. // }
  117. //
  118. // func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) {
  119. // response := RegistorRespData{}
  120. // data, err := json.Marshal(r)
  121. // if err != nil {
  122. // log.Errorf("register marshal request error:%s.", err)
  123. // return response, err
  124. // }
  125. // log.Infof("register request %s", string(data))
  126. // result, err := w.requestServer(registerModel, data)
  127. // if err != nil {
  128. // return response, err
  129. // }
  130. //
  131. // err = json.Unmarshal(result, &response)
  132. // if err != nil {
  133. // log.Errorf("register unmarshal response error:%s.", err)
  134. // return response, err
  135. // }
  136. //
  137. // return response, nil
  138. // }
  139. //
  140. // func (w *serverHTTPWorker) InstallReport(r ReportRequest) error {
  141. // data, err := json.Marshal(r)
  142. // if err != nil {
  143. // log.Errorf("install report marshal request error:%s.", err)
  144. // return err
  145. // }
  146. // log.Infof("install report request %s", string(data))
  147. // if _, err := w.requestServer(installReportModel, data); err != nil {
  148. // return err
  149. // }
  150. // return nil
  151. // }
  152. //
  153. // func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error {
  154. // data, err := json.Marshal(r)
  155. // if err != nil {
  156. // log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err)
  157. // return err
  158. // }
  159. // log.Infof("ConnectTest request:%s.", string(data))
  160. // _, err = w.requestServer(connectTestModel, data)
  161. // return err
  162. // }
  163. //
  164. // func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) {
  165. // response := RegistorRespData{}
  166. // data, err := json.Marshal(r)
  167. // if err != nil {
  168. // log.Errorf("sync agent runner serialize error:%s.", err)
  169. // return response.Agents, err
  170. // }
  171. // log.Infof("sync agent runner request:%s.", string(data))
  172. // result, err := w.requestServer(syncAgentInfoModel, data)
  173. // if err != nil {
  174. // return response.Agents, err
  175. // }
  176. //
  177. // err = json.Unmarshal(result, &response)
  178. // if err != nil {
  179. // log.Errorf("sync agent runner parser error return:%s.", err)
  180. // return response.Agents, err
  181. // }
  182. //
  183. // return response.Agents, nil
  184. // }
  185. //
  186. // func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) {
  187. // log.Infof("heartbeat request:%v.", request)
  188. // response := HB_Config{}
  189. // // Mac本上: P_resource_occupancy[0].cpu -> +Inf -> json: unsupported value: +Inf
  190. // data, err := json.Marshal(request)
  191. // if err != nil {
  192. // log.Errorf("heartbeat marshal request error:%s.", err)
  193. // return response.HB_Config_Detail, err
  194. // }
  195. //
  196. // log.Infof("heartbeat request:%s.", string(data))
  197. // result, err := w.requestServer(heartModel, data)
  198. // if err != nil {
  199. // return response.HB_Config_Detail, err
  200. // }
  201. //
  202. // err = json.Unmarshal(result, &response)
  203. // if err != nil {
  204. // log.Errorf("heartbeat unmarshal response error:%s.", err)
  205. // return response.HB_Config_Detail, err
  206. // }
  207. // return response.HB_Config_Detail, nil
  208. // }
  209. //
  210. // func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) {
  211. // defer func() {
  212. // if err := recover(); err != nil {
  213. // log.Errorf("get task panic, error is <%v>", err)
  214. // }
  215. // }()
  216. // data, err := json.Marshal(request)
  217. // if err != nil {
  218. // log.Errorf("get task serialize get task request, error is <%s>", err)
  219. // return nil, err
  220. // }
  221. //
  222. // log.Infof("get task request:%s.", string(data))
  223. //
  224. // result := RespJson{}
  225. // // index := w.getIndex()
  226. // // for i := 0; i <= 0; i++ {
  227. // url := w.taskUrl
  228. // // fmt.Println("ReceiveTask:", url)
  229. // response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data,
  230. // map[string]string{"token": w.token, "Content-Type": "application/json"})
  231. // // response, err := HTTPRequest(http.MethodPost, url, data, w.token)
  232. // if err != nil {
  233. // log.WithError(err).Errorf("proxy server error")
  234. // return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err)
  235. // }
  236. // log.Infof("url is < %s >, tasks response <%s>", url, string(response))
  237. //
  238. // err = json.Unmarshal(response, &result)
  239. // if err != nil {
  240. // log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  241. // url, err, string(response))
  242. // return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  243. // url, err, string(response))
  244. //
  245. // }
  246. //
  247. // if result.Code != 100000 {
  248. // log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  249. // url, result.Code, result.Msg)
  250. // return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  251. // url, result.Code, result.Msg)
  252. // }
  253. //
  254. // if err != nil {
  255. // log.Errorf("failed with request task, error is <%v>", err)
  256. // return nil, err
  257. // }
  258. //
  259. // if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil {
  260. // base64Data := bytes.TrimSuffix(result.Data, []byte("\""))
  261. // base64Data = bytes.TrimPrefix(base64Data, []byte("\""))
  262. //
  263. // decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data))
  264. // plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2)
  265. // if err != nil {
  266. // log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err)
  267. // return nil, err
  268. // }
  269. //
  270. // log.Infof("task info is <%s>", string(plainText))
  271. // err = json.Unmarshal(plainText, &tasks)
  272. // if err != nil {
  273. // log.Errorf("failed with json unmarshal, task info is <%s>, error is <%v>", string(plainText), err)
  274. // return nil, err
  275. // }
  276. //
  277. // return tasks, nil
  278. // }
  279. //
  280. // err = json.Unmarshal(result.Data, &tasks)
  281. // if err != nil {
  282. // log.Errorf("failed with json unmarshal, error is <%v>", err)
  283. // return nil, err
  284. // }
  285. // return tasks, nil
  286. // }
  287. //
  288. // func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) {
  289. // data, err := json.Marshal(taskResult)
  290. // if err != nil {
  291. // log.Errorf("report task result serialize error:%s.", err)
  292. // return nil, err
  293. // }
  294. // log.Infof("report task result request:%s.", string(data))
  295. //
  296. // data, err = w.requestServer(reportTaskResultModel, data)
  297. // return data, err
  298. // }
  299. //
  300. // func (w *serverHTTPWorker) getIndex() int {
  301. // w.mux.Lock()
  302. // defer w.mux.Unlock()
  303. // return w.currentIndex
  304. // }
  305. func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) {
  306. byteData, err := json.Marshal(data)
  307. if err != nil {
  308. log.Errorf("heartbeat marshal request error:%s.", err)
  309. return byteData, err
  310. }
  311. response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData,
  312. map[string]string{"Content-Type": "application/json"})
  313. if err != nil {
  314. log.WithError(err).Errorf("proxy server error")
  315. // index = w.updateUrl(index)
  316. return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err)
  317. }
  318. log.Infof("url %s response %s", uri, string(response))
  319. result := RespJson{}
  320. err = json.Unmarshal(response, &result)
  321. if err != nil {
  322. log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response))
  323. // index = w.updateUrl(index)
  324. return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err,
  325. string(response))
  326. }
  327. if result.Code != 1000 {
  328. // index = w.updateUrl(index)
  329. log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg)
  330. return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code,
  331. result.Msg)
  332. }
  333. return result.Data, nil
  334. }
  335. //
  336. //func (w *serverHTTPWorker) SetToken(token string) {
  337. // w.token = token
  338. //}
  339. //
  340. //func (w *serverHTTPWorker) GetInfo() (string, string) {
  341. // return w.doccServer, w.token
  342. //}