serverWorker.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. package worker
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. . "github.com/coroot/coroot-node-agent/utils/modelse"
  6. log "github.com/sirupsen/logrus"
  7. "net/http"
  8. "sync"
  9. //
  10. //log "github.com/sirupsen/logrus"
  11. )
  12. const (
  13. registerModel = 0
  14. connectTestModel = iota
  15. syncAgentInfoModel
  16. heartModel
  17. receiveTaskModel
  18. reportTaskResultModel
  19. installReportModel
  20. urlModel = "%s/api/ext/gaia/daemon/%s"
  21. urlModel_ = "%s/api/ext/gaia/daemon/%s/%s"
  22. )
  23. type ServerWorker interface {
  24. //InstallReport(r ReportRequest) error
  25. RegisterApp(RegisterAppReq) error
  26. WhiteList(WhiteListReq) (WhiteData, error)
  27. //SyncAgentInfo(RegistRequest) ([]AgentList, error)
  28. //Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
  29. //ReceiveTask([]interface{}) ([]Task, error)
  30. //ReportTaskResult([]TaskResult) ([]byte, error)
  31. //SetToken(token string)
  32. //GetInfo() (string, string)
  33. }
  34. func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error {
  35. log.Infof("RegisterApp request:%v.", request)
  36. _, err := w.requestServer("/v2/app/create", request)
  37. if err != nil {
  38. return err
  39. }
  40. return nil
  41. }
  42. func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) {
  43. log.Infof("WhiteList request:%v.", request)
  44. response := WhiteData{}
  45. //log.Infof("WhiteList request:%s.", string(data))
  46. result, err := w.requestServer("/api/v1/agent/whitelist", request)
  47. if err != nil {
  48. return response, err
  49. }
  50. err = json.Unmarshal(result, &response)
  51. //fmt.Println(string(result))
  52. if err != nil {
  53. log.Errorf("WhiteList unmarshal response error:%s.", err)
  54. return response, err
  55. }
  56. return response, nil
  57. }
  58. type ServerHTTPWorker struct {
  59. currentIndex int // 标识当前使用的是那个链接方式
  60. countTotal int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮
  61. mux *sync.Mutex
  62. accountID int
  63. hostID string
  64. registUrl string
  65. installReport string
  66. heartUrl string
  67. taskUrl string
  68. returnTaskUrl string
  69. installAgentUrl string
  70. token string
  71. connectTestUrl string
  72. doccServer string
  73. daemonId string
  74. connectList []string
  75. tokenList []string
  76. proxyClient *Client
  77. }
  78. func NewServerHTTPWorker() (*ServerHTTPWorker, error) {
  79. s := &ServerHTTPWorker{
  80. accountID: 1,
  81. //daemonId: daemonId,
  82. //currentIndex: -1,
  83. //connectList: strings.Split(connect, ","),
  84. //tokenList: strings.Split(token, ","),
  85. mux: new(sync.Mutex),
  86. }
  87. s.proxyClient, _ = GetProxyClient()
  88. return s, nil
  89. }
  90. // func (w *serverHTTPWorker) updateUrl(index int) int {
  91. // w.mux.Lock()
  92. // defer w.mux.Unlock()
  93. // // if index != w.currentIndex {
  94. // // return w.currentIndex
  95. // // }
  96. // w.currentIndex = (w.currentIndex + 1) % len(w.connectList)
  97. // connect := ""
  98. // length := len(w.tokenList)
  99. // if length > 0 {
  100. // w.token = w.tokenList[0]
  101. // }
  102. //
  103. // // if !strings.HasSuffix(connect, "http") {
  104. // // connect = fmt.Sprintf("http://%s", connect)
  105. // // }
  106. //
  107. // w.registUrl = fmt.Sprintf(urlModel, connect, "regist")
  108. // w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat")
  109. // w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId)
  110. // w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId)
  111. // w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents")
  112. // w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest")
  113. // w.installReport = fmt.Sprintf(urlModel, connect, "installreport")
  114. // log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s, installReport: %s",
  115. // w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport)
  116. // return w.currentIndex
  117. // }
  118. //
  119. // func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) {
  120. // response := RegistorRespData{}
  121. // data, err := json.Marshal(r)
  122. // if err != nil {
  123. // log.Errorf("register marshal request error:%s.", err)
  124. // return response, err
  125. // }
  126. // log.Infof("register request %s", string(data))
  127. // result, err := w.requestServer(registerModel, data)
  128. // if err != nil {
  129. // return response, err
  130. // }
  131. //
  132. // err = json.Unmarshal(result, &response)
  133. // if err != nil {
  134. // log.Errorf("register unmarshal response error:%s.", err)
  135. // return response, err
  136. // }
  137. //
  138. // return response, nil
  139. // }
  140. //
  141. // func (w *serverHTTPWorker) InstallReport(r ReportRequest) error {
  142. // data, err := json.Marshal(r)
  143. // if err != nil {
  144. // log.Errorf("install report marshal request error:%s.", err)
  145. // return err
  146. // }
  147. // log.Infof("install report request %s", string(data))
  148. // if _, err := w.requestServer(installReportModel, data); err != nil {
  149. // return err
  150. // }
  151. // return nil
  152. // }
  153. //
  154. // func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error {
  155. // data, err := json.Marshal(r)
  156. // if err != nil {
  157. // log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err)
  158. // return err
  159. // }
  160. // log.Infof("ConnectTest request:%s.", string(data))
  161. // _, err = w.requestServer(connectTestModel, data)
  162. // return err
  163. // }
  164. //
  165. // func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) {
  166. // response := RegistorRespData{}
  167. // data, err := json.Marshal(r)
  168. // if err != nil {
  169. // log.Errorf("sync agent runner serialize error:%s.", err)
  170. // return response.Agents, err
  171. // }
  172. // log.Infof("sync agent runner request:%s.", string(data))
  173. // result, err := w.requestServer(syncAgentInfoModel, data)
  174. // if err != nil {
  175. // return response.Agents, err
  176. // }
  177. //
  178. // err = json.Unmarshal(result, &response)
  179. // if err != nil {
  180. // log.Errorf("sync agent runner parser error return:%s.", err)
  181. // return response.Agents, err
  182. // }
  183. //
  184. // return response.Agents, nil
  185. // }
  186. //
  187. // func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) {
  188. // log.Infof("heartbeat request:%v.", request)
  189. // response := HB_Config{}
  190. // // Mac本上: P_resource_occupancy[0].cpu -> +Inf -> json: unsupported value: +Inf
  191. // data, err := json.Marshal(request)
  192. // if err != nil {
  193. // log.Errorf("heartbeat marshal request error:%s.", err)
  194. // return response.HB_Config_Detail, err
  195. // }
  196. //
  197. // log.Infof("heartbeat request:%s.", string(data))
  198. // result, err := w.requestServer(heartModel, data)
  199. // if err != nil {
  200. // return response.HB_Config_Detail, err
  201. // }
  202. //
  203. // err = json.Unmarshal(result, &response)
  204. // if err != nil {
  205. // log.Errorf("heartbeat unmarshal response error:%s.", err)
  206. // return response.HB_Config_Detail, err
  207. // }
  208. // return response.HB_Config_Detail, nil
  209. // }
  210. //
  211. // func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) {
  212. // defer func() {
  213. // if err := recover(); err != nil {
  214. // log.Errorf("get task panic, error is <%v>", err)
  215. // }
  216. // }()
  217. // data, err := json.Marshal(request)
  218. // if err != nil {
  219. // log.Errorf("get task serialize get task request, error is <%s>", err)
  220. // return nil, err
  221. // }
  222. //
  223. // log.Infof("get task request:%s.", string(data))
  224. //
  225. // result := RespJson{}
  226. // // index := w.getIndex()
  227. // // for i := 0; i <= 0; i++ {
  228. // url := w.taskUrl
  229. // // fmt.Println("ReceiveTask:", url)
  230. // response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data,
  231. // map[string]string{"token": w.token, "Content-Type": "application/json"})
  232. // // response, err := HTTPRequest(http.MethodPost, url, data, w.token)
  233. // if err != nil {
  234. // log.WithError(err).Errorf("proxy server error")
  235. // return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err)
  236. // }
  237. // log.Infof("url is < %s >, tasks response <%s>", url, string(response))
  238. //
  239. // err = json.Unmarshal(response, &result)
  240. // if err != nil {
  241. // log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  242. // url, err, string(response))
  243. // return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
  244. // url, err, string(response))
  245. //
  246. // }
  247. //
  248. // if result.Code != 100000 {
  249. // log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  250. // url, result.Code, result.Msg)
  251. // return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
  252. // url, result.Code, result.Msg)
  253. // }
  254. //
  255. // if err != nil {
  256. // log.Errorf("failed with request task, error is <%v>", err)
  257. // return nil, err
  258. // }
  259. //
  260. // if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil {
  261. // base64Data := bytes.TrimSuffix(result.Data, []byte("\""))
  262. // base64Data = bytes.TrimPrefix(base64Data, []byte("\""))
  263. //
  264. // decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data))
  265. // plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2)
  266. // if err != nil {
  267. // log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err)
  268. // return nil, err
  269. // }
  270. //
  271. // log.Infof("task info is <%s>", string(plainText))
  272. // err = json.Unmarshal(plainText, &tasks)
  273. // if err != nil {
  274. // log.Errorf("failed with json unmarshal, task info is <%s>, error is <%v>", string(plainText), err)
  275. // return nil, err
  276. // }
  277. //
  278. // return tasks, nil
  279. // }
  280. //
  281. // err = json.Unmarshal(result.Data, &tasks)
  282. // if err != nil {
  283. // log.Errorf("failed with json unmarshal, error is <%v>", err)
  284. // return nil, err
  285. // }
  286. // return tasks, nil
  287. // }
  288. //
  289. // func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) {
  290. // data, err := json.Marshal(taskResult)
  291. // if err != nil {
  292. // log.Errorf("report task result serialize error:%s.", err)
  293. // return nil, err
  294. // }
  295. // log.Infof("report task result request:%s.", string(data))
  296. //
  297. // data, err = w.requestServer(reportTaskResultModel, data)
  298. // return data, err
  299. // }
  300. //
  301. // func (w *serverHTTPWorker) getIndex() int {
  302. // w.mux.Lock()
  303. // defer w.mux.Unlock()
  304. // return w.currentIndex
  305. // }
  306. func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) {
  307. byteData, err := json.Marshal(data)
  308. if err != nil {
  309. log.Errorf("heartbeat marshal request error:%s.", err)
  310. return byteData, err
  311. }
  312. response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData,
  313. map[string]string{"Content-Type": "application/json"})
  314. if err != nil {
  315. log.WithError(err).Errorf("proxy server error")
  316. // index = w.updateUrl(index)
  317. return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err)
  318. }
  319. log.Infof("url %s response %s", uri, string(response))
  320. result := RespJson{}
  321. err = json.Unmarshal(response, &result)
  322. if err != nil {
  323. log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response))
  324. // index = w.updateUrl(index)
  325. return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err,
  326. string(response))
  327. }
  328. if result.Code != 1000 {
  329. // index = w.updateUrl(index)
  330. log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg)
  331. return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code,
  332. result.Msg)
  333. }
  334. return result.Data, nil
  335. }
  336. //
  337. //func (w *serverHTTPWorker) SetToken(token string) {
  338. // w.token = token
  339. //}
  340. //
  341. //func (w *serverHTTPWorker) GetInfo() (string, string) {
  342. // return w.doccServer, w.token
  343. //}