| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- package worker
- import (
- "encoding/json"
- "fmt"
- "github.com/coroot/coroot-node-agent/flags"
- "github.com/coroot/coroot-node-agent/utils"
- . "github.com/coroot/coroot-node-agent/utils/modelse"
- log "github.com/sirupsen/logrus"
- "net/http"
- "sync"
- //
- //log "github.com/sirupsen/logrus"
- )
- const (
- registerModel = 0
- connectTestModel = iota
- syncAgentInfoModel
- heartModel
- receiveTaskModel
- reportTaskResultModel
- installReportModel
- urlModel = "%s/api/ext/gaia/daemon/%s"
- urlModel_ = "%s/api/ext/gaia/daemon/%s/%s"
- )
- type ServerWorker interface {
- //InstallReport(r ReportRequest) error
- RegisterHost(RegisterHostReq) error
- RegisterApp(RegisterAppReq) error
- WhiteList(WhiteListReq) (WhiteData, error)
- WhiteListV2(WhiteListReq) (WhiteDataV2, error)
- //SyncAgentInfo(RegistRequest) ([]AgentList, error)
- //Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
- //ReceiveTask([]interface{}) ([]Task, error)
- //ReportTaskResult([]TaskResult) ([]byte, error)
- //SetToken(token string)
- //GetInfo() (string, string)
- }
- func (w *ServerHTTPWorker) RegisterHost(request RegisterHostReq) error {
- log.Infof("[server register host] request:%v.", utils.ToString(request))
- result, err := w.requestServer("/v2/app/registerHost", request)
- log.Infof("[server register host] resp:%v.", string(result))
- if err != nil {
- log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
- return err
- }
- return nil
- }
- func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error {
- log.Infof("[server register app] request:%v.", utils.ToString(request))
- result, err := w.requestServer("/v2/app/create", request)
- log.Infof("[server register app] resp data:%v.", string(result))
- if err != nil {
- log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request))
- return err
- }
- return nil
- }
- func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) {
- log.Infof("[server whitelist] request:%v.", request.String())
- response := WhiteData{}
- result, err := w.requestServer("/api/v1/agent/whitelist", request)
- log.Infof("[server whitelist] resp data:%v.", string(result))
- if err != nil {
- return response, err
- }
- err = json.Unmarshal(result, &response)
- if err != nil {
- log.WithError(err).Errorf("[server whitelist] Failed RegisterApp request:%v.", request.String())
- return response, err
- }
- return response, nil
- }
- func (w *ServerHTTPWorker) WhiteListV2(request WhiteListReq) (WhiteDataV2, error) {
- log.Infof("[server whitelist] request:%v.", request.String())
- response := WhiteDataV2{}
- result, err := w.requestServer("/api/v1/agent/whitelist", request)
- log.Infof("[server whitelist] resp data:%v.", string(result))
- if err != nil {
- return response, err
- }
- err = json.Unmarshal(result, &response)
- if err != nil {
- log.WithError(err).Errorf("[server whitelist] Failed RegisterApp request:%v.", request.String())
- return response, err
- }
- return response, nil
- }
- type ServerHTTPWorker struct {
- currentIndex int // 标识当前使用的是那个链接方式
- countTotal int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮
- mux *sync.Mutex
- accountID int
- hostID string
- registUrl string
- installReport string
- heartUrl string
- taskUrl string
- returnTaskUrl string
- installAgentUrl string
- token string
- connectTestUrl string
- doccServer string
- daemonId string
- connectList []string
- tokenList []string
- //nodeInfo *NodeInfo
- prefix string
- proxyClient *Client
- }
- func NewServerHTTPWorker() (*ServerHTTPWorker, error) {
- s := &ServerHTTPWorker{
- accountID: 1,
- //daemonId: daemonId,
- //currentIndex: -1,
- //connectList: strings.Split(connect, ","),
- //tokenList: strings.Split(token, ","),
- mux: new(sync.Mutex),
- }
- s.prefix = *flags.ServerPrefix
- s.proxyClient, _ = GetProxyClient()
- //s.nodeInfo,_ = node.GetNodeInfo()
- return s, nil
- }
- // func (w *serverHTTPWorker) updateUrl(index int) int {
- // w.mux.Lock()
- // defer w.mux.Unlock()
- // // if index != w.currentIndex {
- // // return w.currentIndex
- // // }
- // w.currentIndex = (w.currentIndex + 1) % len(w.connectList)
- // connect := ""
- // length := len(w.tokenList)
- // if length > 0 {
- // w.token = w.tokenList[0]
- // }
- //
- // // if !strings.HasSuffix(connect, "http") {
- // // connect = fmt.Sprintf("http://%s", connect)
- // // }
- //
- // w.registUrl = fmt.Sprintf(urlModel, connect, "regist")
- // w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat")
- // w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId)
- // w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId)
- // w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents")
- // w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest")
- // w.installReport = fmt.Sprintf(urlModel, connect, "installreport")
- // log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s, installReport: %s",
- // w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport)
- // return w.currentIndex
- // }
- //
- // func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) {
- // response := RegistorRespData{}
- // data, err := json.Marshal(r)
- // if err != nil {
- // log.Errorf("register marshal request error:%s.", err)
- // return response, err
- // }
- // log.Infof("register request %s", string(data))
- // result, err := w.requestServer(registerModel, data)
- // if err != nil {
- // return response, err
- // }
- //
- // err = json.Unmarshal(result, &response)
- // if err != nil {
- // log.Errorf("register unmarshal response error:%s.", err)
- // return response, err
- // }
- //
- // return response, nil
- // }
- //
- // func (w *serverHTTPWorker) InstallReport(r ReportRequest) error {
- // data, err := json.Marshal(r)
- // if err != nil {
- // log.Errorf("install report marshal request error:%s.", err)
- // return err
- // }
- // log.Infof("install report request %s", string(data))
- // if _, err := w.requestServer(installReportModel, data); err != nil {
- // return err
- // }
- // return nil
- // }
- //
- // func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error {
- // data, err := json.Marshal(r)
- // if err != nil {
- // log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err)
- // return err
- // }
- // log.Infof("ConnectTest request:%s.", string(data))
- // _, err = w.requestServer(connectTestModel, data)
- // return err
- // }
- //
- // func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) {
- // response := RegistorRespData{}
- // data, err := json.Marshal(r)
- // if err != nil {
- // log.Errorf("sync agent runner serialize error:%s.", err)
- // return response.Agents, err
- // }
- // log.Infof("sync agent runner request:%s.", string(data))
- // result, err := w.requestServer(syncAgentInfoModel, data)
- // if err != nil {
- // return response.Agents, err
- // }
- //
- // err = json.Unmarshal(result, &response)
- // if err != nil {
- // log.Errorf("sync agent runner parser error return:%s.", err)
- // return response.Agents, err
- // }
- //
- // return response.Agents, nil
- // }
- //
- // func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) {
- // log.Infof("heartbeat request:%v.", request)
- // response := HB_Config{}
- // // Mac本上: P_resource_occupancy[0].cpu -> +Inf -> json: unsupported value: +Inf
- // data, err := json.Marshal(request)
- // if err != nil {
- // log.Errorf("heartbeat marshal request error:%s.", err)
- // return response.HB_Config_Detail, err
- // }
- //
- // log.Infof("heartbeat request:%s.", string(data))
- // result, err := w.requestServer(heartModel, data)
- // if err != nil {
- // return response.HB_Config_Detail, err
- // }
- //
- // err = json.Unmarshal(result, &response)
- // if err != nil {
- // log.Errorf("heartbeat unmarshal response error:%s.", err)
- // return response.HB_Config_Detail, err
- // }
- // return response.HB_Config_Detail, nil
- // }
- //
- // func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) {
- // defer func() {
- // if err := recover(); err != nil {
- // log.Errorf("get task panic, error is <%v>", err)
- // }
- // }()
- // data, err := json.Marshal(request)
- // if err != nil {
- // log.Errorf("get task serialize get task request, error is <%s>", err)
- // return nil, err
- // }
- //
- // log.Infof("get task request:%s.", string(data))
- //
- // result := RespJson{}
- // // index := w.getIndex()
- // // for i := 0; i <= 0; i++ {
- // url := w.taskUrl
- // // fmt.Println("ReceiveTask:", url)
- // response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data,
- // map[string]string{"token": w.token, "Content-Type": "application/json"})
- // // response, err := HTTPRequest(http.MethodPost, url, data, w.token)
- // if err != nil {
- // log.WithError(err).Errorf("proxy server error")
- // return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err)
- // }
- // log.Infof("url is < %s >, tasks response <%s>", url, string(response))
- //
- // err = json.Unmarshal(response, &result)
- // if err != nil {
- // log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
- // url, err, string(response))
- // return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
- // url, err, string(response))
- //
- // }
- //
- // if result.Code != 100000 {
- // log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
- // url, result.Code, result.Msg)
- // return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
- // url, result.Code, result.Msg)
- // }
- //
- // if err != nil {
- // log.Errorf("failed with request task, error is <%v>", err)
- // return nil, err
- // }
- //
- // if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil {
- // base64Data := bytes.TrimSuffix(result.Data, []byte("\""))
- // base64Data = bytes.TrimPrefix(base64Data, []byte("\""))
- //
- // decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data))
- // plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2)
- // if err != nil {
- // log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err)
- // return nil, err
- // }
- //
- // log.Infof("task info is <%s>", string(plainText))
- // err = json.Unmarshal(plainText, &tasks)
- // if err != nil {
- // log.Errorf("failed with json unmarshal, task info is <%s>, error is <%v>", string(plainText), err)
- // return nil, err
- // }
- //
- // return tasks, nil
- // }
- //
- // err = json.Unmarshal(result.Data, &tasks)
- // if err != nil {
- // log.Errorf("failed with json unmarshal, error is <%v>", err)
- // return nil, err
- // }
- // return tasks, nil
- // }
- //
- // func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) {
- // data, err := json.Marshal(taskResult)
- // if err != nil {
- // log.Errorf("report task result serialize error:%s.", err)
- // return nil, err
- // }
- // log.Infof("report task result request:%s.", string(data))
- //
- // data, err = w.requestServer(reportTaskResultModel, data)
- // return data, err
- // }
- //
- // func (w *serverHTTPWorker) getIndex() int {
- // w.mux.Lock()
- // defer w.mux.Unlock()
- // return w.currentIndex
- // }
- func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) {
- uri = w.prefix + uri
- byteData, err := json.Marshal(data)
- if err != nil {
- log.WithError(err).Errorf("[requestServer] marshal request error.")
- return byteData, err
- }
- response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData,
- map[string]string{"Content-Type": "application/json"})
- if err != nil {
- log.WithError(err).Errorf("[requestServer] proxy server error")
- // index = w.updateUrl(index)
- return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err)
- }
- log.Infof("url %s response %s", uri, string(response))
- result := RespJson{}
- err = json.Unmarshal(response, &result)
- if err != nil {
- log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response))
- // index = w.updateUrl(index)
- return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err,
- string(response))
- }
- if result.Code != 1000 {
- // index = w.updateUrl(index)
- log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg)
- return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code,
- result.Msg)
- }
- return result.Data, nil
- }
- //
- //func (w *serverHTTPWorker) SetToken(token string) {
- // w.token = token
- //}
- //
- //func (w *serverHTTPWorker) GetInfo() (string, string) {
- // return w.doccServer, w.token
- //}
|