package worker import ( "encoding/json" "fmt" "github.com/coroot/coroot-node-agent/flags" "github.com/coroot/coroot-node-agent/utils" . "github.com/coroot/coroot-node-agent/utils/modelse" log "github.com/sirupsen/logrus" "net/http" "sync" // //log "github.com/sirupsen/logrus" ) const ( registerModel = 0 connectTestModel = iota syncAgentInfoModel heartModel receiveTaskModel reportTaskResultModel installReportModel urlModel = "%s/api/ext/gaia/daemon/%s" urlModel_ = "%s/api/ext/gaia/daemon/%s/%s" ) type ServerWorker interface { //InstallReport(r ReportRequest) error RegisterHost(RegisterHostReq) error RegisterApp(RegisterAppReq) error WhiteList(WhiteListReq) (WhiteData, error) WhiteListV2(WhiteListReq) (WhiteDataV2, error) PullAllAppInfo(EbpfAppReq) (EbpfAppResp, error) GetCodeSetting(CodeSettingReq) (CodeSettingResp, error) //SyncAgentInfo(RegistRequest) ([]AgentList, error) //Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error) //ReceiveTask([]interface{}) ([]Task, error) //ReportTaskResult([]TaskResult) ([]byte, error) //SetToken(token string) //GetInfo() (string, string) } func (w *ServerHTTPWorker) GetCodeSetting(request CodeSettingReq) (CodeSettingResp, error) { //log.Infof("[get GetCodeSetting] request:%v.", utils.ToString(request)) response := CodeSettingResp{} result, err := w.requestServer("/api/v70/agent/getCodeSetting", request) //log.Infof("[get GetCodeSetting] resp data:%v.", string(result)) if err != nil { return response, err } err = json.Unmarshal(result, &response) if err != nil { log.WithError(err).Errorf("[get GetCodeSetting] Failed GetCodeSetting request:%v.", utils.ToString(request)) return response, err } return response, nil } func (w *ServerHTTPWorker) PullAllAppInfo(request EbpfAppReq) (EbpfAppResp, error) { //log.Infof("[get all appinfo] request:%v.", utils.ToString(request)) response := EbpfAppResp{} result, err := w.requestServer("/api/v70/ebpfAppInfo", request) //log.Infof("[get all appinfo] resp data:%v.", string(result)) if err != nil { return response, err } err = json.Unmarshal(result, &response) if err != nil { log.WithError(err).Errorf("[get all appinfo] Failed PullAllAppInfo request:%v.", utils.ToString(request)) return response, err } return response, nil } func (w *ServerHTTPWorker) RegisterHost(request RegisterHostReq) error { //log.Infof("[server register host] request:%v.", utils.ToString(request)) //result, err := w.requestServer("/v2/app/registerHost", request) _, err := w.requestServer("/v2/app/registerHost", request) //log.Infof("[server register host] resp:%v.", string(result)) if err != nil { log.WithError(err).Errorf("[server register] Failed RegisterHost request:%v.", utils.ToString(request)) return err } return nil } func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error { //log.Infof("[server register app] request:%v.", utils.ToString(request)) //result, err := w.requestServer("/v2/app/create", request) _, err := w.requestServer("/v2/app/create", request) //log.Infof("[server register app] resp data:%v.", string(result)) if err != nil { log.WithError(err).Errorf("[server register] Failed RegisterApp request:%v.", utils.ToString(request)) return err } return nil } func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) { //log.Infof("[server whitelist] request:%v.", request.String()) response := WhiteData{} result, err := w.requestServer("/api/v1/agent/whitelist", request) //log.Infof("[server whitelist] resp data:%v.", string(result)) if err != nil { return response, err } err = json.Unmarshal(result, &response) if err != nil { log.WithError(err).Errorf("[server whitelist] Failed Pull WhiteList request:%v.", request.String()) return response, err } return response, nil } func (w *ServerHTTPWorker) WhiteListV2(request WhiteListReq) (WhiteDataV2, error) { //log.Infof("[server whitelist] request:%v.", request.String()) response := WhiteDataV2{} result, err := w.requestServer("/api/v1/agent/whitelist", request) //log.Infof("[server whitelist] resp data:%v.", string(result)) if err != nil { return response, err } err = json.Unmarshal(result, &response) if err != nil { log.WithError(err).Errorf("[server whitelist] Failed Pull WhiteListV2 request:%v.", request.String()) return response, err } return response, nil } type ServerHTTPWorker struct { currentIndex int // 标识当前使用的是那个链接方式 countTotal int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮 mux *sync.Mutex accountID int hostID string registUrl string installReport string heartUrl string taskUrl string returnTaskUrl string installAgentUrl string token string connectTestUrl string doccServer string daemonId string connectList []string tokenList []string //nodeInfo *NodeInfo prefix string proxyClient *Client } func NewServerHTTPWorker() (*ServerHTTPWorker, error) { s := &ServerHTTPWorker{ accountID: 1, //daemonId: daemonId, //currentIndex: -1, //connectList: strings.Split(connect, ","), //tokenList: strings.Split(token, ","), mux: new(sync.Mutex), } s.prefix = *flags.ServerPrefix s.proxyClient, _ = GetProxyClient() //s.nodeInfo,_ = node.GetNodeInfo() return s, nil } // func (w *serverHTTPWorker) updateUrl(index int) int { // w.mux.Lock() // defer w.mux.Unlock() // // if index != w.currentIndex { // // return w.currentIndex // // } // w.currentIndex = (w.currentIndex + 1) % len(w.connectList) // connect := "" // length := len(w.tokenList) // if length > 0 { // w.token = w.tokenList[0] // } // // // if !strings.HasSuffix(connect, "http") { // // connect = fmt.Sprintf("http://%s", connect) // // } // // w.registUrl = fmt.Sprintf(urlModel, connect, "regist") // w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat") // w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId) // w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId) // w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents") // w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest") // w.installReport = fmt.Sprintf(urlModel, connect, "installreport") // log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s, installReport: %s", // w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport) // return w.currentIndex // } // // func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) { // response := RegistorRespData{} // data, err := json.Marshal(r) // if err != nil { // log.Errorf("register marshal request error:%s.", err) // return response, err // } // log.Infof("register request %s", string(data)) // result, err := w.requestServer(registerModel, data) // if err != nil { // return response, err // } // // err = json.Unmarshal(result, &response) // if err != nil { // log.Errorf("register unmarshal response error:%s.", err) // return response, err // } // // return response, nil // } // // func (w *serverHTTPWorker) InstallReport(r ReportRequest) error { // data, err := json.Marshal(r) // if err != nil { // log.Errorf("install report marshal request error:%s.", err) // return err // } // log.Infof("install report request %s", string(data)) // if _, err := w.requestServer(installReportModel, data); err != nil { // return err // } // return nil // } // // func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error { // data, err := json.Marshal(r) // if err != nil { // log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err) // return err // } // log.Infof("ConnectTest request:%s.", string(data)) // _, err = w.requestServer(connectTestModel, data) // return err // } // // func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) { // response := RegistorRespData{} // data, err := json.Marshal(r) // if err != nil { // log.Errorf("sync agent runner serialize error:%s.", err) // return response.Agents, err // } // log.Infof("sync agent runner request:%s.", string(data)) // result, err := w.requestServer(syncAgentInfoModel, data) // if err != nil { // return response.Agents, err // } // // err = json.Unmarshal(result, &response) // if err != nil { // log.Errorf("sync agent runner parser error return:%s.", err) // return response.Agents, err // } // // return response.Agents, nil // } // // func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) { // log.Infof("heartbeat request:%v.", request) // response := HB_Config{} // // Mac本上: P_resource_occupancy[0].cpu -> +Inf -> json: unsupported value: +Inf // data, err := json.Marshal(request) // if err != nil { // log.Errorf("heartbeat marshal request error:%s.", err) // return response.HB_Config_Detail, err // } // // log.Infof("heartbeat request:%s.", string(data)) // result, err := w.requestServer(heartModel, data) // if err != nil { // return response.HB_Config_Detail, err // } // // err = json.Unmarshal(result, &response) // if err != nil { // log.Errorf("heartbeat unmarshal response error:%s.", err) // return response.HB_Config_Detail, err // } // return response.HB_Config_Detail, nil // } // // func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) { // defer func() { // if err := recover(); err != nil { // log.Errorf("get task panic, error is <%v>", err) // } // }() // data, err := json.Marshal(request) // if err != nil { // log.Errorf("get task serialize get task request, error is <%s>", err) // return nil, err // } // // log.Infof("get task request:%s.", string(data)) // // result := RespJson{} // // index := w.getIndex() // // for i := 0; i <= 0; i++ { // url := w.taskUrl // // fmt.Println("ReceiveTask:", url) // response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data, // map[string]string{"token": w.token, "Content-Type": "application/json"}) // // response, err := HTTPRequest(http.MethodPost, url, data, w.token) // if err != nil { // log.WithError(err).Errorf("proxy server error") // return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err) // } // log.Infof("url is < %s >, tasks response <%s>", url, string(response)) // // err = json.Unmarshal(response, &result) // if err != nil { // log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", // url, err, string(response)) // return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", // url, err, string(response)) // // } // // if result.Code != 100000 { // log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", // url, result.Code, result.Msg) // return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", // url, result.Code, result.Msg) // } // // if err != nil { // log.Errorf("failed with request task, error is <%v>", err) // return nil, err // } // // if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil { // base64Data := bytes.TrimSuffix(result.Data, []byte("\"")) // base64Data = bytes.TrimPrefix(base64Data, []byte("\"")) // // decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data)) // plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2) // if err != nil { // log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err) // return nil, err // } // // log.Infof("task info is <%s>", string(plainText)) // err = json.Unmarshal(plainText, &tasks) // if err != nil { // log.Errorf("failed with json unmarshal, task info is <%s>, error is <%v>", string(plainText), err) // return nil, err // } // // return tasks, nil // } // // err = json.Unmarshal(result.Data, &tasks) // if err != nil { // log.Errorf("failed with json unmarshal, error is <%v>", err) // return nil, err // } // return tasks, nil // } // // func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) { // data, err := json.Marshal(taskResult) // if err != nil { // log.Errorf("report task result serialize error:%s.", err) // return nil, err // } // log.Infof("report task result request:%s.", string(data)) // // data, err = w.requestServer(reportTaskResultModel, data) // return data, err // } // // func (w *serverHTTPWorker) getIndex() int { // w.mux.Lock() // defer w.mux.Unlock() // return w.currentIndex // } func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) { uri = w.prefix + uri byteData, err := json.Marshal(data) if err != nil { log.WithError(err).Errorf("[requestServer] marshal request error.") return byteData, err } response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData, map[string]string{"Content-Type": "application/json"}) if err != nil { log.WithError(err).Errorf("[requestServer] proxy server error") // index = w.updateUrl(index) return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err) } log.Infof("url %s response %s", uri, string(response)) result := RespJson{} err = json.Unmarshal(response, &result) if err != nil { log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response)) // index = w.updateUrl(index) return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response)) } if result.Code != 1000 { // index = w.updateUrl(index) log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg) return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg) } return result.Data, nil } // //func (w *serverHTTPWorker) SetToken(token string) { // w.token = token //} // //func (w *serverHTTPWorker) GetInfo() (string, string) { // return w.doccServer, w.token //}