Переглянути джерело

Fixed #TASK_QT-9810 对接apm白名单-应用注册

Carl 1 рік тому
батько
коміт
290d9995ad

+ 5 - 3
Makefile

@@ -11,6 +11,10 @@ ifdef pid
 	endif
 endif
 
+ifeq ($(send),1)
+FILTER+= SEND=1
+endif
+
 all: c-build go-build
 
 build:
@@ -24,6 +28,4 @@ go-build:
 go: go-build run
 
 run:
-	ssh [email protected] 'cd /opt/github/euspace && TRACES_ENDPOINT=http://10.0.12.192:18080/docp/api/v2/data/receive ${FILTER} ./euspace --listen="0.0.0.0:8123"'
-send_run:
-	ssh [email protected] 'cd /opt/github/euspace && SEND=1 TRACES_ENDPOINT=http://10.0.12.192:18080/docp/api/v2/data/receive ${FILTER} ./euspace --listen="0.0.0.0:8123"'
+	ssh [email protected] 'cd /opt/github/euspace && CONFIG_ENDPOINT=10.0.16.250:18080 && TRACES_ENDPOINT=http://10.0.16.250:18080/docp/api/v2/data/receive ${FILTER} ./euspace --listen="0.0.0.0:8123"'

+ 8 - 4
common/app_info.go

@@ -8,8 +8,12 @@ const (
 )
 
 type AppInfo struct {
-	AppName    string `json:"app_name"`
-	AppId      int64  `json:"app_id"`
-	AgentId    int64  `json:"agent_id"`
-	InstanceId int64  `json:"instance_id"`
+	AppName     string   `json:"app_name"`
+	AppId       int64    `json:"app_id"`
+	AgentId     int64    `json:"agent_id"`
+	InstanceId  int64    `json:"instance_id"`
+	Sn          string   `json:"sn"`
+	Sport       int      `json:"sport"`
+	ServiceName string   `json:"service_name"`
+	CodeType    CodeType `json:"code_type"`
 }

+ 37 - 0
common/code_type.go

@@ -61,6 +61,43 @@ func (p CodeType) String() string {
 	}
 }
 
+func (p CodeType) ServiceTypeString() string {
+	switch p {
+	case CodeTypeGo:
+		return "GO"
+	case CodeTypeJava:
+		return "JAVA"
+	case CodeTypeJavaAot:
+		return "JAVA"
+	case CodeTypePHP:
+		return "PHP"
+	case CodeTypePython:
+		return "PYTHON"
+	case CodeTypeDotNet:
+		return "DOTNET"
+	case CodeTypeNode:
+		return "NODE"
+	case CodeTypeC:
+		return "C"
+	case CodeTypeNetCore:
+		return "NETCORE"
+	case CodeTypeNetCoreAot:
+		return "NETCORE_AOT"
+	case CodeTypeLua:
+		return "LUA"
+	case CodeTypeJavaC:
+		return "JAVA_C"
+	case CodeTypeRuby:
+		return "RUBY"
+	case CodeTypeWaitCheck:
+		return "WAIT_CHECK"
+	case CodeTypeUnknown:
+		return "UNKNOWN:Language"
+	default:
+		return "UNKNOWN:Language"
+	}
+}
+
 func (p CodeType) Topic() string {
 	switch p {
 	case CodeTypeGo:

+ 29 - 0
common/filter.go

@@ -0,0 +1,29 @@
+package common
+
+import (
+	"os"
+	"strconv"
+)
+
+var FILTER_PID_KEY = "FILTER_PID"
+
+var INT_FILTER_PID = 0
+
+func init() {
+	filterPid, err := strconv.ParseInt(os.Getenv(FILTER_PID_KEY), 10, 64)
+	if err == nil {
+		INT_FILTER_PID = int(filterPid)
+	}
+}
+
+func IsOpenFilter() bool {
+	return INT_FILTER_PID != 0
+}
+
+func GetFilterPid() (int, bool) {
+	return INT_FILTER_PID, INT_FILTER_PID == 0
+}
+
+func IsFilterPid(filterPid uint32) bool {
+	return int(filterPid) == INT_FILTER_PID
+}

+ 89 - 0
containers/apm_register_app.go

@@ -0,0 +1,89 @@
+package containers
+
+import (
+	"fmt"
+	"github.com/coroot/coroot-node-agent/utils"
+	. "github.com/coroot/coroot-node-agent/utils/modelse"
+)
+
+func (c *Container) RegisterAppInfo(r *Registry, pid uint32) error {
+	if c.WhiteSettingInfo.AppName == "" {
+		return fmt.Errorf("AppName is empty")
+	}
+	var err error
+	originAppName := c.AppInfo.AppName
+	whiteAppName := c.WhiteSettingInfo.AppName
+	if originAppName != whiteAppName {
+		c.AppInfo.AppId, err = utils.BuildInt64ID(whiteAppName).ToInt64()
+		if err != nil {
+			return err
+		}
+		c.AppInfo.ServiceName = c.AppInfo.CodeType.ServiceTypeString()
+		// 注册
+		fmt.Println("注册")
+		hostId, _ := utils.GetHostID()
+		registerAppReq := RegisterAppReq{
+			AppId:       c.AppInfo.AppId,
+			AppName:     c.WhiteSettingInfo.AppName,
+			AccountId:   110,
+			AgentId:     c.AppInfo.AgentId,
+			Sn:          c.AppInfo.Sn,
+			Sport:       c.AppInfo.Sport,
+			ServiceType: c.AppInfo.ServiceName,
+			CodeType:    c.AppInfo.CodeType.Int(),
+			App_type:    1,
+			HostId:      hostId,
+		}
+		registerAppReq.Print()
+
+		err = r.connServer.RegisterApp(registerAppReq)
+		if err != nil {
+			return err
+		}
+
+		c.AppInfo.AppName = c.WhiteSettingInfo.AppName
+
+	}
+
+	// ip:port + process_name + exe路径生成
+	//c.instanceID.IntVal + proc.GetExe(pid)
+	//c.AppInfo.AgentId =
+
+	return nil
+}
+
+func (c *Container) TestRegisterAppInfo(r *Registry) error {
+	var err error
+	c.AppInfo.AppName = "eBPF-APP"
+	c.AppInfo.AppId, err = utils.BuildInt64ID(c.AppInfo.AppName).ToInt64()
+	if err != nil {
+		return err
+	}
+	c.AppInfo.ServiceName = c.AppInfo.CodeType.ServiceTypeString()
+	// 注册
+	hostId, _ := utils.GetHostID()
+	registerAppReq := RegisterAppReq{
+		AppId:       c.AppInfo.AppId,
+		AppName:     c.AppInfo.AppName,
+		AccountId:   110,
+		AgentId:     c.AppInfo.AgentId,
+		Sn:          c.AppInfo.Sn,
+		Sport:       c.AppInfo.Sport,
+		ServiceType: c.AppInfo.ServiceName,
+		CodeType:    c.AppInfo.CodeType.Int(),
+		App_type:    1,
+		HostId:      hostId,
+	}
+	registerAppReq.Print()
+
+	err = r.connServer.RegisterApp(registerAppReq)
+	if err != nil {
+		return err
+	}
+
+	// ip:port + process_name + exe路径生成
+	//c.instanceID.IntVal + proc.GetExe(pid)
+	//c.AppInfo.AgentId =
+
+	return nil
+}

+ 11 - 1
containers/white_list.go → containers/apm_white_list.go

@@ -1,7 +1,9 @@
 package containers
 
 import (
+	"fmt"
 	"github.com/coroot/coroot-node-agent/common"
+	"github.com/coroot/coroot-node-agent/utils"
 	. "github.com/coroot/coroot-node-agent/utils/modelse"
 	log "github.com/sirupsen/logrus"
 )
@@ -23,7 +25,13 @@ func (r *Registry) setWhiteList(whiteData WhiteData) {
 	r.whiteListRules = whiteListMap
 }
 
-func (r *Registry) connWhiteList() (bool, error) {
+func (r *Registry) getWhiteList() (bool, error) {
+	if common.IsOpenFilter() {
+		return false, nil
+	}
+	utils.GetHostID()
+	hostId, _ := utils.GetHostID()
+	fmt.Println(hostId)
 	whiteListReq := WhiteListReq{
 		HostId:    10154813500555812,
 		AccountId: 110,
@@ -35,6 +43,8 @@ func (r *Registry) connWhiteList() (bool, error) {
 		return false, err
 	}
 
+	//fmt.Println(r.whiteLastUpdatedTime)
+	//fmt.Println(whiteData.LastUpdatedTime)
 	// 不用更新
 	if r.whiteLastUpdatedTime >= 0 && r.whiteLastUpdatedTime == whiteData.LastUpdatedTime {
 		return false, nil

+ 8 - 15
containers/container.go

@@ -5,7 +5,6 @@ import (
 	"fmt"
 	. "github.com/coroot/coroot-node-agent/utils/modelse"
 	"os"
-	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -31,7 +30,7 @@ import (
 )
 
 var (
-	gcInterval  = 1 * time.Minute
+	gcInterval  = 10 * time.Second
 	pingTimeout = 300 * time.Millisecond
 )
 
@@ -1099,11 +1098,11 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
 }
 
 func (c *Container) attachUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
-	fmt.Println("=============attachUprobes")
 	if tracer.DisableL7Tracing() {
 		return nil
 	}
 	codeType := c.GetCodeTypeFromCache(pid)
+	fmt.Println("=============attachUprobes", codeType.String())
 	if codeType.IsUnknownCode() {
 		return nil
 	}
@@ -1164,12 +1163,8 @@ func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) erro
 }
 
 func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
-	ENV_PID := os.Getenv("FILTER_PID")
-	if ENV_PID != "" {
-		filterPid, _ := strconv.ParseInt(ENV_PID, 10, 64)
-		if filterPid != int64(pid) {
-			return nil
-		}
+	if common.IsOpenFilter() && !common.IsFilterPid(pid) {
+		return nil
 	}
 	p := c.processes[pid]
 	if p == nil {
@@ -1193,6 +1188,8 @@ func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) erro
 		}
 		p.uprobes = append(p.uprobes, libNetProbes...)
 		p.jvmUprobesChecked = true
+	} else {
+		fmt.Println("[attach] already attach.")
 	}
 	return nil
 }
@@ -1205,12 +1202,8 @@ func (c *Container) errorClose(pid uint32) {
 }
 
 func (c *Container) attachNetCoreUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
-	ENV_PID := os.Getenv("FILTER_PID")
-	if ENV_PID != "" {
-		filterPid, _ := strconv.ParseInt(ENV_PID, 10, 64)
-		if filterPid != int64(pid) {
-			return nil
-		}
+	if common.IsOpenFilter() && !common.IsFilterPid(pid) {
+		return nil
 	}
 	p := c.processes[pid]
 	if p == nil {

+ 16 - 33
containers/container_apm.go

@@ -4,6 +4,7 @@ import (
 	"bufio"
 	"debug/elf"
 	"fmt"
+	"github.com/coroot/coroot-node-agent/common"
 	"github.com/coroot/coroot-node-agent/ebpftracer"
 	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
 	"github.com/coroot/coroot-node-agent/ebpftracer/tracer"
@@ -99,7 +100,8 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 			if err == nil {
 				method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
 				ip, _ := netaddr.ParseIP(hostIp)
-				trace.TraceStartEvent(method, path, r.Status, netaddr.IPPortFrom(ip, port), c.GetCodeTypeFromCache(pid))
+				//codeType := c.GetCodeTypeFromCache(pid)
+				trace.TraceStartEvent(method, path, r.Status, netaddr.IPPortFrom(ip, port), pid, c.GetAppInfo())
 				c.SendEvent(trace, r.TraceId)
 			}
 
@@ -225,7 +227,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 	return nil
 }
 
-func (c *Container) buildIDs(pid uint32) {
+func (c *Container) buildIDs(pid uint32) bool {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	p := c.processes[pid]
@@ -239,16 +241,20 @@ func (c *Container) buildIDs(pid uint32) {
 				// 获取端口号
 				port := address.Port()
 				//c.instanceID.IntVal, c.instanceID.HashtVal, _ =
-				strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip, port))
+				c.AppInfo.Sn = ip.String()
+				c.AppInfo.Sport = int(port)
+				strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
 				c.instanceID.IntVal, _ = strInstanceID.ToInt64()
 				c.instanceID.HashtVal = strInstanceID.ToHashByte()
 				c.AppInfo.InstanceId = c.instanceID.IntVal
 				strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
 				c.AppInfo.AgentId, _ = strAgentID.ToInt64()
-				break
+				c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
+				return true
 			}
 		}
 	}
+	return false
 }
 
 func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
@@ -411,6 +417,10 @@ func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tr
 	return
 }
 
+func (c *Container) GetAppInfo() common.AppInfo {
+	return c.AppInfo
+}
+
 func (c *Container) eventReady() {
 	c.lock.Lock()
 	defer c.lock.Unlock()
@@ -434,17 +444,14 @@ func (c *Container) l7AttachSuccess() {
 }
 
 func (c *Container) verifyAttachConditions(r *Registry, pid uint32) bool {
-
 	p := c.processes[pid]
 	if p != nil && c.checkEventReady() {
 		codeType := c.GetCodeTypeFromCache(pid)
 		cmdline := p.GetCmdline()
-		fmt.Printf("checkEventReady %v [%d] cmdline='%s' len(cmdline)[%d]\n", c.checkEventReady(), pid, cmdline, len(cmdline))
 
 		if len(cmdline) == 0 {
 			return false
 		}
-		fmt.Println("r.getWhiteListByCodeType(codeType):")
 		fmt.Println(r.getWhiteListByCodeType(codeType))
 		// 当前语言的白名单规则
 		for _, setting := range r.getWhiteListByCodeType(codeType) {
@@ -453,12 +460,10 @@ func (c *Container) verifyAttachConditions(r *Registry, pid uint32) bool {
 				continue
 			}
 			fmt.Println("strings.Contains(cmdline, ruleVal)")
-			fmt.Println(cmdline)
-			fmt.Println(ruleVal)
 			// 判断规则
 			if strings.Contains(cmdline, ruleVal) {
-				fmt.Println("------------", pid, ruleVal)
 				c.WhiteSettingInfo = setting
+				fmt.Printf("checkEventReady %v [%d] cmdline='%s' len(cmdline)[%d] %s\n", c.checkEventReady(), pid, cmdline, len(cmdline), ruleVal)
 				return true
 			}
 		}
@@ -466,35 +471,13 @@ func (c *Container) verifyAttachConditions(r *Registry, pid uint32) bool {
 	return false
 }
 
-func (c *Container) RegisterAppInfo(pid uint32) error {
-	if c.WhiteSettingInfo.AppName == "" {
-		return fmt.Errorf("AppName is empty")
-	}
-	var err error
-	originAppName := c.AppInfo.AppName
-	if originAppName != c.WhiteSettingInfo.AppName {
-		c.AppInfo.AppName = c.WhiteSettingInfo.AppName
-		c.AppInfo.AppId, err = utils.BuildInt64ID(c.AppInfo.AppName).ToInt64()
-
-		// 注册
-		fmt.Println("注册")
-	}
-
-	// ip:port + process_name + exe路径生成
-	//c.instanceID.IntVal + proc.GetExe(pid)
-	//c.AppInfo.AgentId =
-
-	return err
-
-}
-
 func (c *Container) detachUprobes(pid uint32) {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	// close uprobe
 	if p := c.processes[pid]; p != nil {
 		if len(p.uprobes) > 0 {
-			fmt.Println("卸载")
+			fmt.Println("卸载---", pid)
 			p.DynamicClose()
 			c.l7Attach = false
 		}

+ 57 - 56
containers/registry.go

@@ -123,13 +123,13 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh
 		whiteListRules: make(WhiteListMap),
 	}
 	// 初始化软负载集群节点
-	proxyClient, clientErr := NewProxyClient("10.0.12.192:18080", false)
+	proxyClient, clientErr := NewProxyClient(os.Getenv("CONFIG_ENDPOINT"), false)
 	if clientErr == nil {
 		// 负载健康检测
 		try.Go(proxyClient.CheckEndpoints, CatchFn)
 		log.Infof("New Proxy Client success.config_server is [%s]", "")
 	} else {
-		log.WithError(clientErr).Errorf("New Proxy Client error")
+		log.WithError(clientErr).Errorf("New Proxy Client error,Please check export CONFIG_ENDPOINT=")
 		return nil, clientErr
 	}
 
@@ -143,10 +143,10 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh
 		return nil, err
 	}
 
-	_, err = r.connWhiteList()
-	if err != nil {
-		return nil, err
-	}
+	//_, err = r.getWhiteList()
+	//if err != nil {
+	//	return nil, err
+	//}
 
 	go r.handleEvents(r.events)
 	if err = r.tracer.Run(r.events); err != nil {
@@ -181,47 +181,46 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 		select {
 		case now := <-gcTicker.C:
 			// todo IDS
-			updateFlag, err := r.connWhiteList()
+			_, err := r.getWhiteList()
 			if err != nil {
 				log.WithError(err).Errorf("connWhiteList error")
 			}
-			if updateFlag && err == nil {
-				for pid, c := range r.containersByPid {
-					// attach
-					if c.verifyAttachConditions(r, pid) {
-						fmt.Println("verifyAttachConditions ok-------", pid)
-						c.GetCodeTypeFromCache(pid)
-						err := c.RegisterAppInfo(pid)
-						if err != nil {
+			for pid, c := range r.containersByPid {
+				if !common.IsOpenFilter() {
+					verifyAttachConditions := c.verifyAttachConditions(r, pid)
+					if verifyAttachConditions {
+						err = c.RegisterAppInfo(r, pid)
+						if err == nil {
+							fmt.Println("start attcah -------", pid)
+							err = c.attachUprobes(r.tracer, pid)
+							err = c.stackTrace(r.tracer, pid)
+							if err != nil {
+								klog.Errorf("Stack trace error", err)
+							}
+						} else {
 							klog.Warningln(err)
-							continue
 						}
-						fmt.Println("start attcah -------", pid)
-						err = c.attachUprobes(r.tracer, pid)
-						fmt.Println(err)
-						fmt.Println("attcah ok -------", pid)
-						//err := c.stackTrace(r.tracer, pid)
-						//if err != nil {
-						//	klog.Errorf("Stack trace error", err)
-						//}
-					} else {
+					}
+
+					if !verifyAttachConditions && c.checkL7AttachReady() {
 						// detach
 						c.detachUprobes(pid)
 					}
+				}
 
-					cg, err := proc.ReadCgroup(pid)
-					if err != nil {
-						delete(r.containersByPid, pid)
-						if c != nil {
-							c.onProcessExit(pid, false)
-						}
-						continue
-					}
-					if c != nil && cg.Id != c.cgroup.Id {
-						delete(r.containersByPid, pid)
+				cg, err := proc.ReadCgroup(pid)
+				if err != nil {
+					delete(r.containersByPid, pid)
+					if c != nil {
 						c.onProcessExit(pid, false)
 					}
+					continue
+				}
+				if c != nil && cg.Id != c.cgroup.Id {
+					delete(r.containersByPid, pid)
+					c.onProcessExit(pid, false)
 				}
+
 			}
 			activeIPs := map[netaddr.IP]struct{}{}
 			for id, c := range r.containersById {
@@ -301,38 +300,40 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
 					c.onListenOpen(e.Pid, e.SrcAddr, false)
 					// cmdline InstanceID agentID
-					c.buildIDs(e.Pid)
-					//c.GetCodeTypeFromCache(e.Pid)
-					//c.attachTlsUprobes(r.tracer, e.Pid)
-					// c.attachJVMUprobes(r.tracer, e.Pid)
-					//c.attachUprobes(r.tracer, e.Pid)
-					//err := c.stackTrace(r.tracer, e.Pid)
-					//if err != nil {
-					//	klog.Errorf("Stack trace error", err)
-					//}
-					// can attach
-					c.eventReady()
+					if c.buildIDs(e.Pid) {
+						c.eventReady()
+					}
+					if common.IsOpenFilter() && common.IsFilterPid(e.Pid) {
+						c.TestRegisterAppInfo(r)
+						c.attachUprobes(r.tracer, e.Pid)
+						err := c.stackTrace(r.tracer, e.Pid)
+						if err != nil {
+							klog.Errorf("Stack trace error", err)
+						}
+					}
 				} else {
 					klog.Infoln("TCP listen open from unknown container", e)
 				}
-			case ebpftracer.EventTypeListenClose:
-				if c := r.containersByPid[e.Pid]; c != nil {
-					c.onListenClose(e.Pid, e.SrcAddr)
-				}
-
 			case ebpftracer.EventTypeConnectionOpen:
 				//fmt.Println("ebpftracer.EventTypeConnectionOpen==================", e.Pid)
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
 					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false)
-					//c.attachTlsUprobes(r.tracer, e.Pid)
-					// c.attachJVMUprobes(r.tracer, e.Pid)
-					//c.attachUprobes(r.tracer, e.Pid)
-					//c.GetCodeTypeFromCache(e.Pid)
-					// can attach
 					c.eventReady()
+					if common.IsOpenFilter() && common.IsFilterPid(e.Pid) {
+						c.TestRegisterAppInfo(r)
+						c.attachUprobes(r.tracer, e.Pid)
+						err := c.stackTrace(r.tracer, e.Pid)
+						if err != nil {
+							klog.Errorf("Stack trace error", err)
+						}
+					}
 				} else {
 					klog.Infoln("TCP connection from unknown container", e)
 				}
+			case ebpftracer.EventTypeListenClose:
+				if c := r.containersByPid[e.Pid]; c != nil {
+					c.onListenClose(e.Pid, e.SrcAddr)
+				}
 			case ebpftracer.EventTypeConnectionError:
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
 					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, 0, true)

+ 10 - 13
containers/stack.go

@@ -5,19 +5,18 @@ import (
 	"debug/elf"
 	debugelf "debug/elf"
 	"fmt"
-	"io"
-	"log"
-	"os"
-	"regexp"
-	"sort"
-	"strconv"
-
+	"github.com/coroot/coroot-node-agent/common"
 	"github.com/coroot/coroot-node-agent/ebpftracer"
 	"github.com/coroot/coroot-node-agent/ebpftracer/tracer"
 	tracerelf "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
 	"github.com/coroot/coroot-node-agent/proc"
 	"golang.org/x/arch/arm64/arm64asm"
 	"golang.org/x/arch/x86/x86asm"
+	"io"
+	"log"
+	"os"
+	"regexp"
+	"sort"
 )
 
 type uprobesDef struct {
@@ -32,13 +31,11 @@ func (c *Container) stackTrace(tracer *ebpftracer.Tracer, pid uint32) error {
 	if tracer.DisableStackTracing() {
 		return nil
 	}
-	ENV_PID := os.Getenv("FILTER_PID")
-	if ENV_PID != "" {
-		filterPid, _ := strconv.ParseInt(ENV_PID, 10, 64)
-		if filterPid != int64(pid) {
-			return nil
-		}
+
+	if common.IsOpenFilter() && !common.IsFilterPid(pid) {
+		return nil
 	}
+
 	codeType := c.GetCodeTypeFromCache(pid)
 	if codeType.IsUnknownCode() || codeType.IsJvmCode() {
 		return nil

+ 4 - 6
ebpftracer/tracer/filter.go

@@ -4,14 +4,12 @@ import (
 	"fmt"
 	"github.com/cilium/ebpf"
 	"github.com/cilium/ebpf/asm"
-	"os"
-	"strconv"
+	"github.com/coroot/coroot-node-agent/common"
 )
 
 func PidFilter(collectionSpec *ebpf.CollectionSpec) {
-	ENV_PID := os.Getenv("FILTER_PID")
-	if ENV_PID != "" {
-		filterPid, _ := strconv.ParseInt(ENV_PID, 10, 64)
+	filterPid, ok := common.GetFilterPid()
+	if ok {
 		type Editor struct {
 			instructions     *asm.Instructions
 			ReferenceOffsets map[string][]int
@@ -34,7 +32,7 @@ func PidFilter(collectionSpec *ebpf.CollectionSpec) {
 					continue
 					//return errors.Errorf("symbol %v: load: found %v instead of %v", symbol, load.OpCode, ldDWImm)
 				}
-				load.Constant = filterPid
+				load.Constant = int64(filterPid)
 			}
 		}
 	}

+ 7 - 4
go.mod

@@ -14,15 +14,21 @@ require (
 	github.com/docker/docker v25.0.0+incompatible
 	github.com/florianl/go-conntrack v0.3.0
 	github.com/go-kit/log v0.2.1
+	github.com/go-logr/logr v1.4.1
+	github.com/go-sql-driver/mysql v1.8.1
+	github.com/gomodule/redigo v1.9.2
 	github.com/grafana/pyroscope/ebpf v0.4.1
+	github.com/hashicorp/go-version v1.6.0
 	github.com/jpillora/backoff v1.0.0
 	github.com/mdlayher/taskstats v0.0.0-20230712191918-387b3d561d14
 	github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417
+	github.com/pkg/errors v0.9.1
 	github.com/prometheus/client_golang v1.18.0
 	github.com/prometheus/client_model v0.5.0
 	github.com/prometheus/common v0.46.0
 	github.com/prometheus/prometheus v0.50.1
 	github.com/pyroscope-io/dotnetdiag v1.2.1
+	github.com/sirupsen/logrus v1.9.3
 	github.com/stretchr/testify v1.8.4
 	github.com/vishvananda/netlink v1.2.1-beta.2.0.20220608195807-1a118fe229fc
 	github.com/vishvananda/netns v0.0.4
@@ -46,6 +52,7 @@ require (
 
 require (
 	cloud.google.com/go/compute v1.23.3 // indirect
+	filippo.io/edwards25519 v1.1.0 // indirect
 	github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1 // indirect
 	github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 // indirect
 	github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.1 // indirect
@@ -77,7 +84,6 @@ require (
 	github.com/felixge/httpsnoop v1.0.4 // indirect
 	github.com/fsnotify/fsnotify v1.7.0 // indirect
 	github.com/go-logfmt/logfmt v0.6.0 // indirect
-	github.com/go-logr/logr v1.4.1 // indirect
 	github.com/go-logr/stdr v1.2.2 // indirect
 	github.com/go-ole/go-ole v1.2.6 // indirect
 	github.com/go-openapi/analysis v0.21.4 // indirect
@@ -102,7 +108,6 @@ require (
 	github.com/google/uuid v1.5.0 // indirect
 	github.com/grafana/regexp v0.0.0-20221123153739-15dc172cd2db // indirect
 	github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
-	github.com/hashicorp/go-version v1.6.0 // indirect
 	github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
 	github.com/hashicorp/hcl v1.0.0 // indirect
 	github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab // indirect
@@ -138,7 +143,6 @@ require (
 	github.com/pelletier/go-toml/v2 v2.0.5 // indirect
 	github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
 	github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
-	github.com/pkg/errors v0.9.1 // indirect
 	github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
 	github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
 	github.com/prometheus/common/sigv4 v0.1.0 // indirect
@@ -146,7 +150,6 @@ require (
 	github.com/samber/lo v1.38.1 // indirect
 	github.com/sasha-s/go-deadlock v0.3.1 // indirect
 	github.com/shirou/gopsutil/v3 v3.22.10 // indirect
-	github.com/sirupsen/logrus v1.9.3 // indirect
 	github.com/spf13/afero v1.9.2 // indirect
 	github.com/spf13/cast v1.5.0 // indirect
 	github.com/spf13/cobra v1.6.1 // indirect

+ 6 - 0
go.sum

@@ -41,6 +41,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
 cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
 cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
+filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
+filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
 github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
 github.com/Azure/azure-sdk-for-go v67.1.0+incompatible h1:oziYcaopbnIKfM69DL05wXdypiqfrUKdxUKrKpynJTw=
 github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1 h1:lGlwhPtrX6EVml1hO0ivjkUxsSyl4dsiw9qcA1k/3IQ=
@@ -461,6 +463,8 @@ github.com/go-openapi/validate v0.22.1 h1:G+c2ub6q47kfX1sOBLwIQwzBVt8qmOAARyo/9F
 github.com/go-openapi/validate v0.22.1/go.mod h1:rjnrwK57VJ7A8xqfpAOEKRH8yQSGUriMu5/zuPSQ1hg=
 github.com/go-resty/resty/v2 v2.11.0 h1:i7jMfNOJYMp69lq7qozJP+bjgzfAzeOhuGlyDrqxT/8=
 github.com/go-resty/resty/v2 v2.11.0/go.mod h1:iiP/OpA0CkcL3IGt1O0+/SIItFUbkkyw5BGXiVdTu+A=
+github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
+github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg=
 github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
@@ -546,6 +550,8 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
 github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
 github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/gomodule/redigo v1.9.2 h1:HrutZBLhSIU8abiSfW8pj8mPhOyMYjZT/wcA4/L9L9s=
+github.com/gomodule/redigo v1.9.2/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw=
 github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=

+ 185 - 177
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -4,7 +4,9 @@ import (
 	"crypto/md5"
 	"encoding/json"
 	"fmt"
-	"os"
+	. "github.com/coroot/coroot-node-agent/ebpftracer"
+	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
+	"github.com/coroot/coroot-node-agent/utils"
 	"sort"
 	"strconv"
 	"strings"
@@ -143,12 +145,9 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
 		fmt.Println("------event_num---- "+sd.Name(), "--->", len(sd.Events())) // 一次请求完整数据
 		// 构建map *RootDataT
 		var rootData RootDataT
-		// todo 应用注册逻辑
-		if os.Getenv("JAVA") == "1" {
-			rootData = initRootDataJava()
-		} else {
-			rootData = initRootDataFromEvent()
-		}
+
+		rootData = initRootDataFromEvent()
+
 		// build http入口 MapInfoT
 		code_type := buildAppMapFromEvent(&rootData, sd)
 		// 构建maps
@@ -156,20 +155,17 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
 			aaa, _ := json.Marshal(event)
 			fmt.Println("event.info", string(aaa))
 			mNode := buildMapNodeFromEvent(event)
-			switch event.EventType {
+			switch EventType(event.EventType) {
 			// stack
-			case 11:
+			case EventTypeFunEnt:
 			// l7 event
-			case 10:
-				switch event.ProtocolType {
-				// http
-				case 1:
+			case EventTypeL7Request:
+				switch l7.Protocol(event.ProtocolType) {
+				case l7.ProtocolHTTP:
 					buildHttpMapFromEvent(&mNode, event)
-				// mysql
-				case 5:
+				case l7.ProtocolMysql:
 					buildMysqlMapEvent(&mNode, event)
-				// redis
-				case 3:
+				case l7.ProtocolRedis:
 					buildRedisMapEvent(&mNode, event)
 				}
 			}
@@ -344,70 +340,71 @@ func buildLevelFromEvent(sdl *RootDataT) {
 	}
 }
 
-func initRootData(traceId string) RootDataT {
-	data := RootDataT{
-		AccountId:      110,
-		AgentId:        1011005252979954, // TODO 更新 基于 ip:port + process_name + exe路径生成
-		AgentVersion:   "2.1.0",
-		AppId:          5410049101545798, // TODO 更新 基于appname生成
-		AppIdFrom:      -1,
-		AppName:        "eBPF-agent", // TODO 更新 ip:port || process_name
-		CalledId:       -1,
-		ClientIp:       "",
-		CollTime:       0,
-		Cpu:            0,
-		Custom:         "",
-		HostId:         10154813500555812,
-		HostName:       "localhost",
-		HttpCode:       0,
-		HttpMethod:     "",
-		InstanceId:     1005051101515357, // TODO 更新 基于ip:port
-		InstanceIdFrom: -1,
-		Maps:           []MapInfoT{},
-		MemU:           0,
-		MemUP:          0,
-		OperType:       "",
-		Parameters:     []interface{}{},
-		ParentTaskName: 0,
-		Period:         -1,
-		RespTime:       0,
-		Sampling:       0,
-		ServiceName:    "GO",
-		ServiceType:    APP_SERVICE_TYPE,
-		Sip:            "",
-		Sn:             "",
-		SpanIdFrom:     "",
-		Sport:          0,
-		TId:            -1,
-		TName:          "",
-		TraceId:        traceId,
-		TransIds:       []interface{}{},
-		TypeFrom:       "",
-		Uri:            "",
-		UserDir:        0,
-		VipIds:         []interface{}{},
-	}
-	return data
-}
+//func initRootData(traceId string) RootDataT {
+//	data := RootDataT{
+//		AccountId:      110,
+//		AgentId:        1011005252979954, // TODO 更新 基于 ip:port + process_name + exe路径生成
+//		AgentVersion:   "2.1.0",
+//		AppId:          5410049101545798, // TODO 更新 基于appname生成
+//		AppIdFrom:      -1,
+//		AppName:        "eBPF-agent", // TODO 更新 ip:port || process_name
+//		CalledId:       -1,
+//		ClientIp:       "",
+//		CollTime:       0,
+//		Cpu:            0,
+//		Custom:         "",
+//		HostId:         10154813500555812,
+//		HostName:       "localhost",
+//		HttpCode:       0,
+//		HttpMethod:     "",
+//		InstanceId:     1005051101515357, // TODO 更新 基于ip:port
+//		InstanceIdFrom: -1,
+//		Maps:           []MapInfoT{},
+//		MemU:           0,
+//		MemUP:          0,
+//		OperType:       "",
+//		Parameters:     []interface{}{},
+//		ParentTaskName: 0,
+//		Period:         -1,
+//		RespTime:       0,
+//		Sampling:       0,
+//		ServiceName:    "GO",
+//		ServiceType:    APP_SERVICE_TYPE,
+//		Sip:            "",
+//		Sn:             "",
+//		SpanIdFrom:     "",
+//		Sport:          0,
+//		TId:            -1,
+//		TName:          "",
+//		TraceId:        traceId,
+//		TransIds:       []interface{}{},
+//		TypeFrom:       "",
+//		Uri:            "",
+//		UserDir:        0,
+//		VipIds:         []interface{}{},
+//	}
+//	return data
+//}
 
 func initRootDataFromEvent() RootDataT {
+	hostID, _ := utils.GetHostID()
 	data := RootDataT{
 		AccountId:      110,
-		AgentId:        1011005252979954, // TODO 更新 基于 ip:port + process_name + exe路径生成
+		AgentId:        0, // 基于 ip:port + process_name + exe路径生成
 		AgentVersion:   "2.1.0",
-		AppId:          5410049101545798, // TODO 更新 基于appname生成
+		AppId:          0, // 基于appname生成
 		AppIdFrom:      -1,
-		AppName:        "eBPF-agent", // TODO 更新 ip:port || process_name
+		AppName:        "eBPF-agent", // server配置
 		CalledId:       -1,
 		ClientIp:       "",
 		CollTime:       0,
 		Cpu:            0,
 		Custom:         "",
-		HostId:         10154813500555812,
+		HostId:         hostID,
 		HostName:       "localhost",
 		HttpCode:       0,
 		HttpMethod:     "",
-		InstanceId:     1005051101515357, // TODO 更新 基于ip:port
+		InstanceId:     0, // 基于ip:port
 		InstanceIdFrom: -1,
 		Maps:           []MapInfoT{},
 		MemU:           0,
@@ -418,7 +415,7 @@ func initRootDataFromEvent() RootDataT {
 		Period:         -1,
 		RespTime:       0,
 		Sampling:       0,
-		ServiceName:    "GO",
+		ServiceName:    "",
 		ServiceType:    APP_SERVICE_TYPE,
 		Sip:            "",
 		Sn:             "",
@@ -561,25 +558,25 @@ func buildMapNodeFromEvent(event tracesdk.Event) MapInfoT {
 }
 
 // 构建拼装
-func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
-	mNode, mapType := initMapNode(span(sd))
-	switch mapType {
-	case "APPLICATION":
-		buildAppMap(&mNode, traceRoot, sd)
-		traceRoot.TheEnd = true
-	case "HTTP":
-		buildHttpMap(&mNode, sd)
-	case "Mysql":
-		buildMysqlMap(&mNode, sd)
-	case "Redis":
-		buildRedisMap(&mNode, sd)
-	}
-	if mapType != "" {
-		mNode.Nid = traceRoot.Index
-		traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode)
-	}
-	return mNode
-}
+//func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
+//	mNode, mapType := initMapNode(span(sd))
+//	switch mapType {
+//	case "APPLICATION":
+//		buildAppMap(&mNode, traceRoot, sd)
+//		traceRoot.TheEnd = true
+//	case "HTTP":
+//		buildHttpMap(&mNode, sd)
+//	case "Mysql":
+//		buildMysqlMap(&mNode, sd)
+//	case "Redis":
+//		buildRedisMap(&mNode, sd)
+//	}
+//	if mapType != "" {
+//		mNode.Nid = traceRoot.Index
+//		traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode)
+//	}
+//	return mNode
+//}
 
 //func buildAndAssemblyMapFromEvent(event tracesdk.Event, traceRoot *RootDataT) MapInfoT {
 //	mNode := buildMapNodeFromEvent(event)
@@ -598,49 +595,49 @@ func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
 //	return mNode
 //}
 
-func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
-	mNode.ServiceName = GO_SERVICE_NAME
-	mNode.ServiceType = APP_SERVICE_TYPE
-	mNode.MethodName = "net/http.(*Transport).roundTrip()"
-	mNode.Level = 1
-	mNode.Pid = 0
-	mNode.Nid = 1
-	// 构建root节点
-	traceRoot.RootData.RespTime = mNode.PureTime
-	traceRoot.RootData.CollTime = mNode.StartTime
-	traceRoot.Index = 1
-	for _, attr := range sd.Attributes() {
-		fmt.Println(attr.Key, ":", attr.Value.AsInterface())
-		switch attr.Key {
-		case "http.uri":
-			traceRoot.RootData.Uri = attr.Value.AsString()
-		case "http.method":
-			traceRoot.RootData.HttpMethod = attr.Value.AsString()
-		case "http.status_code":
-			traceRoot.RootData.HttpCode = attr.Value.AsInt64()
-		case "net.peer.name":
-			traceRoot.RootData.ClientIp = attr.Value.AsString()
-			traceRoot.RootData.Sip = attr.Value.AsString()
-			traceRoot.RootData.Sn = attr.Value.AsString()
-		case "net.peer.port":
-			traceRoot.RootData.Sport = attr.Value.AsInt64()
-			traceRoot.RootData.LocalPort = attr.Value.AsInt64()
-		case "server.trace_id_from":
-			traceRoot.RootData.TraceId = attr.Value.AsString()
-		case "server.called_id":
-			traceRoot.RootData.CalledId = attr.Value.AsInt64()
-		case "server.instance_id_from":
-			traceRoot.RootData.InstanceIdFrom = attr.Value.AsInt64()
-		case "server.app_id_from":
-			traceRoot.RootData.AppIdFrom = attr.Value.AsInt64()
-		case "server.span_id_from":
-			traceRoot.RootData.SpanIdFrom = attr.Value.AsString()
-		case "server.type_from":
-			traceRoot.RootData.TypeFrom = attr.Value.AsString()
-		}
-	}
-
-}
+//func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
+//	mNode.ServiceName = GO_SERVICE_NAME
+//	mNode.ServiceType = APP_SERVICE_TYPE
+//	mNode.MethodName = "net/http.(*Transport).roundTrip()"
+//	mNode.Level = 1
+//	mNode.Pid = 0
+//	mNode.Nid = 1
+//	// 构建root节点
+//	traceRoot.RootData.RespTime = mNode.PureTime
+//	traceRoot.RootData.CollTime = mNode.StartTime
+//	traceRoot.Index = 1
+//	for _, attr := range sd.Attributes() {
+//		fmt.Println(attr.Key, ":", attr.Value.AsInterface())
+//		switch attr.Key {
+//		case "http.uri":
+//			traceRoot.RootData.Uri = attr.Value.AsString()
+//		case "http.method":
+//			traceRoot.RootData.HttpMethod = attr.Value.AsString()
+//		case "http.status_code":
+//			traceRoot.RootData.HttpCode = attr.Value.AsInt64()
+//		case "net.peer.name":
+//			traceRoot.RootData.ClientIp = attr.Value.AsString()
+//			traceRoot.RootData.Sip = attr.Value.AsString()
+//			traceRoot.RootData.Sn = attr.Value.AsString()
+//		case "net.peer.port":
+//			traceRoot.RootData.Sport = attr.Value.AsInt64()
+//			traceRoot.RootData.LocalPort = attr.Value.AsInt64()
+//		case "server.trace_id_from":
+//			traceRoot.RootData.TraceId = attr.Value.AsString()
+//		case "server.called_id":
+//			traceRoot.RootData.CalledId = attr.Value.AsInt64()
+//		case "server.instance_id_from":
+//			traceRoot.RootData.InstanceIdFrom = attr.Value.AsInt64()
+//		case "server.app_id_from":
+//			traceRoot.RootData.AppIdFrom = attr.Value.AsInt64()
+//		case "server.span_id_from":
+//			traceRoot.RootData.SpanIdFrom = attr.Value.AsString()
+//		case "server.type_from":
+//			traceRoot.RootData.TypeFrom = attr.Value.AsString()
+//		}
+//	}
+//
+//}
 
 func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
 	mNode := MapInfoT{
@@ -705,37 +702,48 @@ func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
 			mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
 		case "server.code_type":
 			code_type = attr.Value.AsInt64()
+		case "server.app_name":
+			traceRoot.AppName = attr.Value.AsString()
+		case "server.service_name":
+			traceRoot.ServiceName = attr.Value.AsString()
+			mNode.ServiceName = attr.Value.AsString()
+		case "server.app_id":
+			traceRoot.AppId = attr.Value.AsInt64()
+		case "server.agent_id":
+			traceRoot.AgentId = attr.Value.AsInt64()
+		case "server.instance_id":
+			traceRoot.InstanceId = attr.Value.AsInt64()
 		}
 	}
 	traceRoot.Maps = append(traceRoot.Maps, mNode)
 	return int(code_type)
 }
 
-func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) {
-	mNode.ServiceName = HTTP_SERVICE_NAME
-	mNode.ServiceType = HTTP_SERVICE_TYPE
-	mNode.Schema = "http"
-	mNode.MethodName = "net/http.serverHandler.ServeHTTP()"
-	var descAddr string
-	for _, attr := range sd.Attributes() {
-		//fmt.Println(attr.Key, ":", attr.Value.AsInterface())
-		switch attr.Key {
-		case "http.ip":
-			mNode.Ip = attr.Value.AsString()
-			descAddr += mNode.Ip
-		case "http.port":
-			mNode.Port = attr.Value.AsInt64()
-			descAddr += ":" + attr.Value.AsString()
-		case "http.uri":
-			mNode.Uri = attr.Value.AsString()
-		case "http.assumed_app_id":
-			mNode.AssumedAppId = attr.Value.AsInt64()
-		case "http.span_id":
-			mNode.SpanId = attr.Value.AsString()
-		}
-	}
-	//mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
-}
+//func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) {
+//	mNode.ServiceName = HTTP_SERVICE_NAME
+//	mNode.ServiceType = HTTP_SERVICE_TYPE
+//	mNode.Schema = "http"
+//	mNode.MethodName = "net/http.serverHandler.ServeHTTP()"
+//	var descAddr string
+//	for _, attr := range sd.Attributes() {
+//		//fmt.Println(attr.Key, ":", attr.Value.AsInterface())
+//		switch attr.Key {
+//		case "http.ip":
+//			mNode.Ip = attr.Value.AsString()
+//			descAddr += mNode.Ip
+//		case "http.port":
+//			mNode.Port = attr.Value.AsInt64()
+//			descAddr += ":" + attr.Value.AsString()
+//		case "http.uri":
+//			mNode.Uri = attr.Value.AsString()
+//		case "http.assumed_app_id":
+//			mNode.AssumedAppId = attr.Value.AsInt64()
+//		case "http.span_id":
+//			mNode.SpanId = attr.Value.AsString()
+//		}
+//	}
+//	//mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
+//}
 
 func buildHttpMapFromEvent(mNode *MapInfoT, event tracesdk.Event) {
 	mNode.ServiceName = HTTP_SERVICE_NAME
@@ -770,28 +778,28 @@ func buildHttpMapFromEvent(mNode *MapInfoT, event tracesdk.Event) {
 	//mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
 }
 
-func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {
-	mNode.Dbn = "unknown"
-	mNode.ServiceName = MYSQL_SERVICE_NAME
-	mNode.ServiceType = SQL_SERVICE_TYPE
-	mNode.MethodName = "database/sql.Query()"
-	for _, attr := range sd.Attributes() {
-		//fmt.Println(attr.Key, ":", attr.Value.AsInterface())
-		switch attr.Key {
-		case "net.peer.name":
-			mNode.Ip = attr.Value.AsString()
-		case "net.peer.port":
-			mNode.Port = attr.Value.AsInt64()
-		case "db.statement":
-			query := attr.Value.AsString()
-			mNode.Ps = []string{query}
-			words := strings.Fields(query)
-			if len(words) > 0 {
-				mNode.OperType = strings.ToUpper(words[0])
-			}
-		}
-	}
-}
+//func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {
+//	mNode.Dbn = "unknown"
+//	mNode.ServiceName = MYSQL_SERVICE_NAME
+//	mNode.ServiceType = SQL_SERVICE_TYPE
+//	mNode.MethodName = "database/sql.Query()"
+//	for _, attr := range sd.Attributes() {
+//		//fmt.Println(attr.Key, ":", attr.Value.AsInterface())
+//		switch attr.Key {
+//		case "net.peer.name":
+//			mNode.Ip = attr.Value.AsString()
+//		case "net.peer.port":
+//			mNode.Port = attr.Value.AsInt64()
+//		case "db.statement":
+//			query := attr.Value.AsString()
+//			mNode.Ps = []string{query}
+//			words := strings.Fields(query)
+//			if len(words) > 0 {
+//				mNode.OperType = strings.ToUpper(words[0])
+//			}
+//		}
+//	}
+//}
 
 func buildMysqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 	mNode.Dbn = "unknown"

+ 25 - 11
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/go.mod

@@ -1,28 +1,40 @@
 module go.opentelemetry.io/otel/exporters/otlp/otlptrace
 
-go 1.20
+go 1.21
+
+toolchain go1.21.3
 
 require (
-	github.com/google/go-cmp v0.5.9
+	github.com/coroot/coroot-node-agent v0.0.0-00010101000000-000000000000
+	github.com/google/go-cmp v0.6.0
 	github.com/stretchr/testify v1.8.4
-	go.opentelemetry.io/otel v1.19.0
-	go.opentelemetry.io/otel/sdk v1.19.0
-	go.opentelemetry.io/otel/trace v1.19.0
+	go.opentelemetry.io/otel v1.22.0
+	go.opentelemetry.io/otel/sdk v1.22.0
+	go.opentelemetry.io/otel/trace v1.22.0
 	go.opentelemetry.io/proto/otlp v1.0.0
-	google.golang.org/protobuf v1.31.0
+	google.golang.org/protobuf v1.32.0
 )
 
 require (
-	github.com/davecgh/go-spew v1.1.1 // indirect
-	github.com/go-logr/logr v1.2.4 // indirect
+	github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
+	github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect
+	github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
+	github.com/go-logr/logr v1.4.1 // indirect
 	github.com/go-logr/stdr v1.2.2 // indirect
+	github.com/hashicorp/go-version v1.6.0 // indirect
 	github.com/kr/pretty v0.3.1 // indirect
-	github.com/pmezard/go-difflib v1.0.0 // indirect
+	github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
 	github.com/rogpeppe/go-internal v1.10.0 // indirect
-	go.opentelemetry.io/otel/metric v1.19.0 // indirect
-	golang.org/x/sys v0.12.0 // indirect
+	github.com/sirupsen/logrus v1.9.3 // indirect
+	go.opentelemetry.io/otel/metric v1.22.0 // indirect
+	go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
+	go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 // indirect
+	golang.org/x/sys v0.18.0 // indirect
+	gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
 	gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
 	gopkg.in/yaml.v3 v3.0.1 // indirect
+	inet.af/netaddr v0.0.0-20230525184311-b8eac61e914a // indirect
+	k8s.io/klog/v2 v2.120.1 // indirect
 )
 
 replace go.opentelemetry.io/otel => ../../..
@@ -32,3 +44,5 @@ replace go.opentelemetry.io/otel/sdk => ../../../sdk
 replace go.opentelemetry.io/otel/trace => ../../../trace
 
 replace go.opentelemetry.io/otel/metric => ../../../metric
+
+replace github.com/coroot/coroot-node-agent => ../../../../../../

+ 61 - 14
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/go.sum

@@ -1,15 +1,22 @@
+github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
+github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 h1:ez/4by2iGztzR4L0zgAOR8lTQK9VlyBVVd7G4omaOQs=
+github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
 github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
-github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
+github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dvyukov/go-fuzz v0.0.0-20210103155950-6a8e9d1f2415/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw=
 github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
-github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
-github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
+github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
+github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
 github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
 github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
-github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
-github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
-github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
+github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
+github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
 github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
 github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
 github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
@@ -18,23 +25,63 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
 github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
-github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
+github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
 github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
 github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
+github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
+github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
 github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
+github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
 go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
-golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
-golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
-google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
-google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
+go4.org/intern v0.0.0-20211027215823-ae77deb06f29 h1:UXLjNohABv4S58tHmeuIZDO6e3mHpW2Dx33gaNt03LE=
+go4.org/intern v0.0.0-20211027215823-ae77deb06f29/go.mod h1:cS2ma+47FKrLPdXFpr7CuxiTW3eyJbWew4qx0qtQWDA=
+go4.org/unsafe/assume-no-moving-gc v0.0.0-20211027215541-db492cf91b37/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E=
+go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 h1:WJhcL4p+YeDxmZWg141nRm7XC8IDmhz7lk5GpadO1Sg=
+go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
+golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
+google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+inet.af/netaddr v0.0.0-20230525184311-b8eac61e914a h1:1XCVEdxrvL6c0TGOhecLuB7U9zYNdxZEjvOqJreKZiM=
+inet.af/netaddr v0.0.0-20230525184311-b8eac61e914a/go.mod h1:e83i32mAQOW1LAqEIweALsuK2Uw4mhQadA5r7b0Wobo=
+k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw=
+k8s.io/klog/v2 v2.120.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=

+ 8 - 2
tracing/apm_tracing.go

@@ -124,7 +124,7 @@ func (t *Trace) AllEventReady(traceID uint64) bool {
 	return t.startEventReady && t.endEventReady && *t.currenEventCount >= t.needEventCount
 }
 
-func (t *Trace) TraceStartEvent(method, path string, status l7.Status, addr netaddr.IPPort, codeType common.CodeType) {
+func (t *Trace) TraceStartEvent(method, path string, status l7.Status, addr netaddr.IPPort, pid uint32, appInfo common.AppInfo) {
 	t.span.SetAttributes(semconv.HTTPURL(fmt.Sprintf("http://%s%s", addr.String(), path)),
 		semconv.HTTPMethod(method),
 		attribute.String("http.uri", path))
@@ -135,7 +135,13 @@ func (t *Trace) TraceStartEvent(method, path string, status l7.Status, addr neta
 	t.commonAttrs = []attribute.KeyValue{
 		semconv.NetPeerName(addr.IP().String()),
 		semconv.NetPeerPort(int(addr.Port())),
-		attribute.Int("server.code_type", codeType.Int()),
+		// buildAppMapFromEvent
+		attribute.Int("server.code_type", appInfo.CodeType.Int()),
+		attribute.String("server.app_name", appInfo.AppName),
+		attribute.String("server.service_name", appInfo.ServiceName),
+		attribute.Int64("server.app_id", appInfo.AppId),
+		attribute.Int64("server.agent_id", appInfo.AgentId),
+		attribute.Int64("server.instance_id", appInfo.InstanceId),
 	}
 	t.span.SetAttributes(t.commonAttrs...)
 	t.startReady()

+ 447 - 0
utils/modelse/models.go

@@ -0,0 +1,447 @@
+package modelse
+
+import (
+	"encoding/json"
+	"fmt"
+
+	log "github.com/sirupsen/logrus"
+)
+
+var (
+	Ip            = ""
+	StopAgent     = "agent_stop"
+	StartAgent    = "agent_start"
+	ReStartAgent  = "agent_restart"
+	UpgradeAgent  = "agent_upgrade"
+	RemoveAgent   = "agent_uninstall"
+	InstallAgent  = "agent_install"
+	FuseConf      = "daemon_fuse"
+	RestartDaemon = "daemon_restart"
+	FuseUpdate    = "agent_fusingrule_update"
+	ConfUpdate    = "agent_config_update"
+	PipePath      = ""
+
+	ProxyNginxName = "proxy"
+	LocalNginxName = "gateway"
+	IsUseProxy     = false
+	DataPort       = ""
+	ConfigPort     = ""
+	FilePort       = ""
+
+	WorkPath = ""
+
+	// 配置下发
+	ConfigurateDistribute = "configuration_distribute"
+	// 熔断下发
+	FuseRuleDistribute = "fusingrule_distribute"
+
+	// 任务操作
+	CreateRunner = "collector_start"
+	StopRunner   = "collector_stop"
+	ReSetRunner  = "collector_reset"
+	DeleteRunner = "collector_delete"
+
+	// Daemon操作
+	UpgradeDaemon   = "daemon_upgrade"
+	UninstallDaemon = "daemon_uninstall"
+
+	// 任务状态
+	Running = "running"
+	Success = "success"
+	Fail    = "fail"
+	InValid = "invalid"
+)
+
+// 状态编码
+var (
+	SuccessCode = 100000
+
+	// agent 获取失败
+	AgentGetErrorCode = 200000
+
+	// agent 启动错误码
+	AgentStartupErrorCode = 200001
+	// agent 停止错误码
+	AgentStopErrorCode = 200002
+	// agent 重启错误码
+	AgentRestartErrorCode = 200003
+	// agent 卸载错误码
+	AgentUninstallErrorCode = 200004
+	// agent 安装失败
+	AgentInstallErrorCode = 200005
+	// agent 更新熔断规则失败
+	AgentFuseUpdateErrorCode = 200006
+
+	// agent 升级失败
+	AgentUpgradeErrorCode = 200007
+
+	// distribute configuration failed
+	DistributeConfigErrorCode = 200008
+
+	DistributeFuseErrorCode = 200009
+
+	// agent任务失败
+	RunnerErrorCode = 300001
+
+	// daemon 升级失败
+	DaemonUpgradeErrorCode = 400001
+
+	// daemon 卸载失败
+	DaemonDeleteErrorCode = 400002
+
+	// daemon 卸载失败
+	DaemonRestartErrorCode = 400003
+
+	// 任务被丢弃
+	FailExecByHighLevelTask = 500001
+
+	// 任务无效
+	InvaildTask = 500002
+)
+
+type RegisterAppReq struct {
+	AppId       int64  `json:"appId"`
+	AppName     string `json:"appName"`
+	AccountId   int    `json:"accountId"`
+	AgentId     int64  `json:"agentId"`
+	Sn          string `json:"sn"`
+	Sport       int    `json:"sport"`
+	ServiceType string `json:"serviceType"`
+	CodeType    int    `json:"codeType"`
+	App_type    int    `json:"app_type"`
+	HostId      int64  `json:"hostId"`
+	//AppNameAnalysis []map[string]string `json:"app_name_analysis"`
+	//CwAppTransform  string              `json:"cw_app_transform"`
+	//OrgCode         string              `json:"org_code"`
+	//AppZone         *string             `json:"app_zone"`
+}
+
+func (r *RegisterAppReq) Print() {
+	a, err := json.Marshal(r)
+	if err != nil {
+		log.Error(err)
+	}
+	println(string(a))
+}
+
+type WhiteListReq struct {
+	HostId    int64 `json:"host_id"`
+	AccountId int   `json:"account_id"`
+}
+
+type WhiteListResp struct {
+	Code int       `json:"code"`
+	Msg  string    `json:"msg"`
+	Data WhiteData `json:"data"`
+}
+
+type WhiteSettingInfo struct {
+	AppName         string `json:"app_name"`
+	K8SNamespace    string `json:"k8s_namespace"`
+	K8SWorkLoadName string `json:"k8s_workLoadName"`
+	Filters         string `json:"filters"`
+	Type            string `json:"type"`
+	PodName         string `json:"pod_name"`
+}
+
+type Appscope struct {
+	Settings []WhiteSettingInfo `json:"settings"`
+	Code     string             `json:"code"`
+}
+
+type WhiteData struct {
+	Executable      []string   `json:"executable"`
+	OneagentEnable  int        `json:"oneagent_enable"`
+	LastUpdatedTime int        `json:"last_updated_time"`
+	Appscope        []Appscope `json:"appscope"`
+}
+
+type ReportRequest struct {
+	RegistRequest
+	AgentID     string `json:"agent_id"`
+	StateDetail struct {
+		Msg string `json:"msg"`
+	} `json:"statedetail"`
+	State          string `json:"state"`
+	ConnectionTest bool   `json:"_connection_test"`
+}
+
+// 注册请求数据结构
+type RegistRequest struct {
+	Pid                 int         `json:"agent_pid"`
+	User                string      `json:"agent_run_user"`
+	IP                  string      `json:"ip"`
+	HostName            string      `json:"host_name"`
+	DaemonInstanceID    string      `json:"daemon_instance_id"`
+	ContentIdentity     string      `json:"contentIdentity"`
+	DaemonVersion       string      `json:"daemon_version"`
+	Os                  string      `json:"os"`
+	OsVersion           string      `json:"os_version"`
+	CPUCount            int         `json:"cpu_count"`
+	Mem                 int64       `json:"mem"`
+	Timestamp           int64       `json:"timestamp"`
+	Agents              []AgentInfo `json:"agents"`
+	Status              string      `json:"status"`
+	Tags                string      `json:"tags"`
+	TagNames            string      `json:"tagNames"`
+	AgentID             string      `json:"agent_ids"`
+	ConfigIds           string      `json:"config_ids"`
+	IsTestingConnection bool        `json:"is_connection_test"`
+	Parent              string      `json:"parent"`
+	Connect             string      `json:"connect"`
+	OperatingEnv        string      `json:"operatingEnv"`
+	*K8SInfo                        // k8s,容器化
+
+	SystemVersion string `json:"system_version"`
+
+	// 新增安装配置项
+	Identification      string `json:"identification"`
+	ClusterId           string `json:"cluster_id"`
+	ProxyLocalIp        string `json:"proxy_local_ip"`
+	PluginInstallParams string `json:"plugin_install_params"`
+
+	// 新增系统 uuid
+	SystemUUID string `json:"system_uuid"`
+	HostID     string `json:"host_id"`
+}
+
+// k8s,容器化
+type K8SInfo struct {
+	Host_type string `json:"host_type,omitempty"`
+	Namespace string `json:"namespace"` // 命名空间
+	// Cluster_name   string `json:"cluster_name"`
+	Pod_name      string `json:"pod_name"`         // 容器组名称
+	Pod_ip        string `json:"pod_ip"`           // 容器组ip
+	Node_name     string `json:"node_name"`        // 节点名称
+	Node_ip       string `json:"node_ip"`          // 节点ip
+	Cluster_name  string `json:"k8s_cluster_name"` // 集群名称
+	ImageName     string `json:"image_name"`       // 镜像名称
+	ImageVersion  string `json:"image_version"`    // 镜像版本
+	K8sModuleName string `json:"k8s_module_name"`  // k8s模块名称
+}
+
+type AgentInfo struct {
+	IsNamePipe           bool                 `json:"pipe"`
+	EnableDeepDiscover   bool                 `json:"enable_deep_discover"`
+	Pid                  int                  `json:"pid"`
+	ConfigPath           string               `json:"config"`
+	User                 string               `json:"username"`
+	Agent_instance_id    string               `json:"agent_instance_id"`
+	Agent_id             string               `json:"agent_id"`
+	Version              string               `json:"version"`
+	Url                  string               `json:"url"`
+	Collectors           []Collectors         `json:"collectors"`
+	P_resource_occupancy []Resource_occupancy `json:"p_resource_occupancy"`
+	Fusing               string               `json:"fusing"`
+	FusingVersion        float64              `json:"fusing_rules_version"`
+	Fusing_condition     Fusing_condition     `json:"fusing_condition"`
+	// 注册时安装配置文件的ID
+	ConfigId           string `json:"configId"`
+	FusingId           string `json:"fusingId"`
+	DepthDiscoverCycle string `json:"cdc_cycle"`
+	Prefix             string `json:"prefix"` // cdc接口路由前缀
+	// 日志路径字段
+	LogPath string `json:"log_path"`
+
+	// proxy 内网ip
+	ProxyLocalIp string `json:"proxy_local_ip"`
+}
+
+type TaskStatus struct {
+	TaskId     string `json:"taskid"`
+	TaskStatus string `json:"taskStatus"`
+}
+
+type Collectors struct {
+	Name            interface{}            `json:"name"`
+	Status          string                 `json:"status"`
+	CollectorConfig map[string]interface{} `json:"collectorConfig"`
+	Id              string                 `json:"id"`
+	Type            string                 `json:"type"`
+}
+
+// 资源使用
+type Resource_occupancy struct {
+	Pid int     `json:"pid"`
+	Cpu float64 `json:"cpu"`
+	Mem float32 `json:"mem"`
+}
+
+// agent熔断规则
+type Fusing_condition struct {
+	Cpu    int `json:"cpu"`
+	Memory int `json:"memory"`
+	Disk   int `json:"disk"`
+}
+
+// 结果解析
+type RespJson struct {
+	Code       int             `json:"code"`
+	Msg        string          `json:"msg"`
+	Status     string          `json:"status"`
+	Encryption bool            `json:"encryption"`
+	Data       json.RawMessage `json:"data"`
+}
+
+type RegistorRespData struct {
+	Agents []AgentList `json:"agents"`
+	Ip     string      `json:"realIp"`
+	// server 当前时间戳
+	ServerNow         int64              `json:"serverNow"`
+	AgentInstallInfos []AgentInstallInfo `json:"agentInstallInfos"`
+}
+
+type AgentList struct {
+	Agent_instance_id     string                 `json:"path"`
+	Status                string                 `json:"status"`
+	Collectors            []Collectors           `json:"collectors"`
+	FusingRulesVersion    float64                `json:"fusing_rules_version"`
+	Default_FusingContent map[string]interface{} `json:"default_FusingContent"`
+}
+
+type AgentInstallInfo struct {
+	AgentId     string                 `json:"agentId"`
+	InstallPath string                 `json:"installPath"`
+	Config      map[string]interface{} `json:"config"`
+	ConfigId    string                 `json:"configId"`
+	FusingId    string                 `json:"fusingId"`
+}
+
+type HeartBeatInfo struct {
+	Fusing               bool                 `json:"fusing"`
+	Pid                  int                  `json:"agent_pid"`
+	Status               int                  `json:"fuse_status"`
+	FusingVersion        float64              `json:"fusing_rules_version"`
+	AgentInstanceID      string               `json:"agent_instance_id"`
+	ConfVersion          string               `json:"contentIdentity"`
+	User                 string               `json:"agent_run_user"`
+	P_resource_occupancy []map[string]float64 `json:"p_resource_occupancy"`
+	Fusing_condition     map[string]float64   `json:"fusing_condition"`
+}
+
+type DaemonHeartBeatInfo struct {
+	Status               int                  `json:"status"`
+	Daemon_instance_id   string               `json:"daemon_instance_id"`
+	ContentIdentity      string               `json:"contentIdentity"`
+	P_resource_occupancy []map[string]float64 `json:"p_resource_occupancy"`
+	HeartBeatInfo        []HeartBeatInfo      `json:"agents"`
+	Timestamp            int64                `json:"timestamp"`
+	Alive_agent          []string             `json:"agent_instance_ids"` // 离线agentid上报
+}
+
+type HeartConfContent struct {
+	Content   string `json:"content"`
+	ContentID string `json:"contentIdentity"`
+}
+
+type HeartJsonConf struct {
+	Content   json.RawMessage `json:"content"`
+	ContentID string          `json:"contentIdentity"`
+}
+
+type FuseContent struct {
+	Content json.RawMessage `json:"fusing"`
+}
+
+type HB_Config struct {
+	HB_Config_Detail map[string]HeartConfContent `json:"config"`
+}
+
+type Task struct {
+	TaskId          string                 `json:"taskId"`
+	Detail          Detail                 `json:"detail"`
+	Status          string                 `json:"status"`
+	TimeStamp       int64                  `json:"timestamp"`
+	TaskConfig      map[string]interface{} `json:"taskConfig"`
+	AgentInstanceId string                 `json:"agentPath"`
+	Operate         string                 `json:"operate"`
+}
+
+func (t *Task) GetTaskConfigString(key string) (string, string) {
+	err := ""
+	v, ok := t.TaskConfig[key]
+	if !ok {
+		log.Errorf("task is %v Get value err, Can not get %s", t, key)
+		err = fmt.Sprintf("task id is %s, Get value err, Can not get %s", t.TaskId, key)
+		return "", err
+	}
+	result, ok := v.(string)
+	if !ok {
+		log.Errorf("task is %v Get value err, Value is not a string %v", t, v)
+		err = fmt.Sprintf("task id is %s, Get value err, Value is not a string %v", t.TaskId, v)
+		return "", err
+	}
+	return result, ""
+}
+
+func (t *Task) GetTaskConfigAny(key string) (interface{}, string) {
+	err := ""
+	v, ok := t.TaskConfig[key]
+	if !ok {
+		log.Warnf("task is %v Get value err, Can not get %s", t, key)
+		err = fmt.Sprintf("task id is %s, Get value err, Can not get %s", t.TaskId, key)
+		return nil, err
+	}
+	return v, ""
+}
+
+type Detail struct {
+	Msg string `json:"msg"`
+	AgentInfo
+}
+
+type TaskResult struct {
+	Code      int    `json:"code"`
+	Task_Id   string `json:"taskId"`
+	Detail    Detail `json:"detail"`
+	Status    string `json:"status"`
+	TimeStamp int64  `json:"timestamp"`
+}
+
+// Agent心跳结果解析
+type HeartBeatRespJson struct {
+	Code string         `json:"code"`
+	Data AgentHeartbeat `json:"data"`
+}
+
+type AgentHeartbeat struct {
+	P_resource_occupancy []map[string]float64 `json:"p_resource_occupancy"`
+	Fusing               bool                 `json:"fusing"`
+	FusingVersion        float64              `json:"fusing_rules_version"`
+	Fusing_condition     map[string]int       `json:"fusing_condition"`
+	ConfVersion          string               `json:"contentIdentity"`
+}
+
+// 熔断功能对象创建
+var (
+	Pid    int
+	CPUNUM float64
+)
+
+// 资源对象,包括资源限制名称、资源的阈值、阈值触发次数、阈值触发次数状态为熔断
+type Rule struct {
+	ThresholdTimes int     // 触发阈值的次数统计
+	RecoverTimes   int     // 恢复次数统计
+	Threshold      float64 // 阈值
+	Enable         bool    // 是否启用
+}
+
+// 解析agent的数据结构
+type Result struct {
+	Timestamp int64              `json:"timestamp"`
+	Pid       int                `json:"pid"`
+	Command   string             `json:"command"`
+	Class     string             `json:"type"`
+	Metrics   map[string]float64 `json:"metric"`
+}
+
+// 熔断规则配置数据结构
+type FuseConfig struct {
+	TholdCount          int                `json:"tholdCount"`          // 多少次阈值触发熔断
+	RecoveryCount       int                `json:"recoveryCount"`       // 多少次正常可以恢复agent的状态
+	Window              int                `json:"monFrequency"`        // 多长时间采集一次agent的指标数据
+	Version             int                `json:"version"`             // 熔断规则版本
+	ShakeCountThreshold int                `json:"shakeCountThreshold"` // 抖动次数阈值
+	Cycle               int                `json:"cycle"`               // 抖动触发周期
+	Threshold           map[string]float64 `json:"threshold"`           // 熔断规则阈值
+}

+ 12 - 0
utils/util.go

@@ -0,0 +1,12 @@
+package utils
+
+import (
+	"fmt"
+	log "github.com/sirupsen/logrus"
+	"runtime/debug"
+)
+
+func CatchFn(err interface{}) {
+	log.Errorf(fmt.Sprintf("panic : %s\n", err))
+	log.Errorf(fmt.Sprint(string(debug.Stack())))
+}

+ 346 - 0
utils/worker/proxySender.go

@@ -0,0 +1,346 @@
+package worker
+
+import (
+	"bytes"
+	"crypto/tls"
+	"crypto/x509"
+	"errors"
+	"fmt"
+	//"github.com/cloudwise/agentdaemon/src/util"
+	//"github.com/cloudwise/agentdaemon/src/util/conf/dynamicLoad/config"
+	log "github.com/sirupsen/logrus"
+	"io"
+	"io/ioutil"
+	"net"
+	"net/http"
+	"net/url"
+	"os"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+var gProxyClient *Client
+
+const CheckHealthy = 10 * time.Second
+const FailCount = 3
+
+type Client struct {
+	Endpoints []*Endpoint
+	Current   uint32
+	Mutex     sync.RWMutex
+}
+
+type Endpoint struct {
+	Url       string
+	FailCount uint32
+	LastFail  time.Time
+	Available atomic.Value
+	Mutex     sync.RWMutex
+}
+
+func (e *Endpoint) Failed() {
+	e.Mutex.Lock()
+	defer e.Mutex.Unlock()
+
+	atomic.AddUint32(&e.FailCount, 1)
+	e.LastFail = time.Now()
+
+	if e.FailCount >= FailCount {
+		e.Available.Store(false)
+	}
+}
+
+func (e *Endpoint) Succeeded() {
+	e.Mutex.Lock()
+	defer e.Mutex.Unlock()
+
+	atomic.StoreUint32(&e.FailCount, 0)
+	e.Available.Store(true)
+}
+
+func (c *Client) GetStatus() bool {
+	failAllEndpoint := false
+	for _, v := range c.Endpoints {
+		if v.Available.Load().(bool) {
+			failAllEndpoint = true
+		}
+	}
+	return failAllEndpoint
+}
+
+func (c *Client) DoWithNextEndpoint(method string, uri string, data []byte, header map[string]string, timeOut time.Duration) (*http.Response, error, bool) {
+	c.Mutex.Lock()
+	defer c.Mutex.Unlock()
+
+	var (
+		endpoint *Endpoint
+		index    uint32
+		skipNum  uint32
+		noPoints bool
+	)
+	endpointsNum := len(c.Endpoints)
+
+	if !c.GetStatus() {
+		noPoints = true
+		return nil, errors.New("all endpoints are unavailable."), noPoints
+	}
+
+	for i := uint32(0); i < uint32(endpointsNum); i++ {
+		index = (atomic.LoadUint32(&c.Current) + i) % uint32(endpointsNum)
+		if c.Endpoints[index].Available.Load().(bool) {
+			endpoint = c.Endpoints[index]
+			break
+		}
+		skipNum++
+	}
+
+	proxyUrl := endpoint.Url + uri
+	log.WithField("module", "proxy").Debugf("proxy is <%s>", proxyUrl)
+
+	req, err := http.NewRequest(method, proxyUrl, bytes.NewBuffer(data))
+
+	if header != nil {
+		contentType, ok := header["Content-Type"]
+		if ok {
+			req.Header.Set("Content-Type", contentType)
+			delete(header, "Content-Type")
+		}
+		for k, v := range header {
+			req.Header.Add(k, v)
+		}
+	}
+	if err != nil {
+		return nil, err, noPoints
+	}
+	// 加载自签名证书
+	tlsConfig := &tls.Config{InsecureSkipVerify: true}
+	//certFile := util.GetDefaultPath(util.MetaPath, "cert.pem")
+	certFile := ""
+	_, pemErr := os.Stat(certFile)
+	if pemErr == nil {
+		cert, err := ioutil.ReadFile(certFile)
+		if err != nil {
+			log.Fatal(err)
+		}
+		certPool := x509.NewCertPool()
+		certPool.AppendCertsFromPEM(cert)
+		// 配置TLS客户端
+		tlsConfig = &tls.Config{RootCAs: certPool}
+	}
+	client := &http.Client{
+		Timeout: timeOut,
+		Transport: &http.Transport{
+			Dial: func(netw, addr string) (net.Conn, error) {
+				c, err := net.DialTimeout(netw, addr, time.Second*time.Duration(3))
+				if err != nil {
+					return nil, err
+				}
+				return c, nil
+			},
+			ResponseHeaderTimeout: time.Second * time.Duration(10),
+			MaxIdleConnsPerHost:   20,
+			IdleConnTimeout:       time.Second * time.Duration(10),
+			TLSClientConfig:       tlsConfig,
+		},
+	}
+	resp, err := client.Do(req)
+	log.WithError(err).Infof("endpoint %s%s body <%s>", endpoint.Url, uri, string(data))
+	if err != nil {
+		endpoint.Failed()
+		return nil, errors.New(fmt.Sprintf("endpoint %s url %s body %s error %v", endpoint.Url, uri, string(data), err)), noPoints
+	}
+
+	if resp.StatusCode >= 400 {
+		endpoint.Failed()
+		return nil, fmt.Errorf("endpoint returned %d status code", resp.StatusCode), noPoints
+	}
+
+	endpoint.Succeeded()
+
+	atomic.AddUint32(&c.Current, 1+skipNum)
+	return resp, nil, noPoints
+}
+
+func (c *Client) CheckEndpoints() {
+	ticker := time.NewTicker(CheckHealthy)
+	defer ticker.Stop()
+
+	for range ticker.C {
+		c.Mutex.Lock()
+		for _, endpoint := range c.Endpoints {
+			endpoint.Mutex.Lock()
+			if !endpoint.Available.Load().(bool) && time.Since(endpoint.LastFail) > CheckHealthy {
+				endpoint.Available.Store(true)
+				endpoint.FailCount = 0
+			}
+			endpoint.Mutex.Unlock()
+		}
+		c.Mutex.Unlock()
+	}
+}
+
+func (c *Client) SetEndpoint(endPointStr string, ssl bool) error {
+	if endPointStr == "" {
+		return fmt.Errorf("endpoints can not empty.")
+	}
+	c.Mutex.Lock()
+	defer c.Mutex.Unlock()
+	serverList := strings.Split(endPointStr, ",")
+	var endpoints []*Endpoint
+	var available atomic.Value
+	phonetic := "http"
+	if ssl {
+		phonetic = "https"
+	}
+	available.Store(true)
+	for _, server := range serverList {
+
+		if !strings.HasPrefix(server, phonetic) {
+			server = fmt.Sprintf("%s://%s", phonetic, server)
+		}
+		log.WithField("module", "proxy").Debugf("proxy server %s", server)
+		endpoints = append(endpoints, &Endpoint{
+			Url:       server,
+			FailCount: 0,
+			LastFail:  time.Time{},
+			Available: available,
+			Mutex:     sync.RWMutex{},
+		})
+	}
+	c.Endpoints = endpoints
+	c.Current = 0
+	return nil
+}
+
+func (c *Client) ProxyRequest(method string, uri string, data []byte, header map[string]string) ([]byte, error) {
+	u, err := url.Parse(uri)
+	if err != nil {
+		log.WithField("module", "proxy").WithError(err).Errorf("url Parse error")
+		return nil, err
+	}
+	uri = u.RequestURI()
+	for {
+		resp, err, noPoints := c.DoWithNextEndpoint(method, uri, data, header, time.Duration(60)*time.Second)
+		if noPoints {
+			log.WithField("module", "proxy").WithError(err).Errorf("proxy has no healthy points")
+			return nil, err
+		}
+		if err != nil {
+			log.WithField("module", "proxy").WithError(err).Errorf("proxy resp faild")
+			time.Sleep(100 * time.Millisecond)
+			continue
+		}
+		defer func() {
+			if resp != nil {
+				resp.Body.Close()
+			}
+		}()
+		body, err := ioutil.ReadAll(resp.Body)
+		if err != nil {
+			log.WithField("module", "proxy").WithError(err).Errorf("proxy ReadAll Body faild")
+			time.Sleep(100 * time.Millisecond)
+			continue
+		}
+
+		log.WithField("module", "proxy").Debugf("response: is %s", body)
+
+		return body, err
+	}
+}
+
+func (c *Client) ProxyRespRequest(method string, uri string, data []byte, header map[string]string) (*http.Response, error) {
+	u, err := url.Parse(uri)
+	if err != nil {
+		return nil, err
+	}
+	uri = u.RequestURI()
+	for {
+		resp, err, noPoints := c.DoWithNextEndpoint(method, uri, data, header, time.Duration(600)*time.Second)
+		if noPoints {
+			log.WithField("module", "proxy").WithError(err).Errorf("proxy has no healthy points")
+			return nil, err
+		}
+		if err != nil {
+			log.WithField("module", "proxy").WithError(err).Errorf("proxy resp faild")
+			time.Sleep(100 * time.Millisecond)
+			continue
+		}
+		return resp, err
+	}
+}
+
+func NewProxyClient(endPointStr string, ssl bool) (*Client, error) {
+	if gProxyClient != nil {
+		return gProxyClient, nil
+	}
+	var err error
+	client := &Client{Mutex: sync.RWMutex{}}
+	err = client.SetEndpoint(endPointStr, ssl)
+	gProxyClient = client
+	return client, err
+}
+
+func GetProxyClient() (*Client, error) {
+	if gProxyClient != nil {
+		return gProxyClient, nil
+	} else {
+		return nil, fmt.Errorf("ProxyClient is nil")
+	}
+}
+
+func DownloadPackage(url, path string) error {
+	http.DefaultClient.Timeout = time.Duration(600) * time.Second
+	//htmlData, err := http.Get(url)
+	proxyClient, _ := GetProxyClient()
+	htmlData, err := proxyClient.ProxyRespRequest(http.MethodGet, url, nil, nil)
+
+	if err != nil {
+		log.WithError(err).Errorf("There are some network errors.")
+		return err
+	}
+	defer htmlData.Body.Close()
+	if htmlData == nil {
+		return errors.New("Install agent failed,No data was obtained from the URL,URL is" + url)
+	}
+	file, err := os.Create(path)
+	if err != nil {
+		log.Errorf("download package create path[%s] error:%s", path, err)
+		return err
+	}
+	defer file.Close()
+
+	buf := make([]byte, 1024*5)
+	totalSize := 0
+	totalSizeBak := 0
+	lastTime := time.Now()
+
+	for {
+		size, err := htmlData.Body.Read(buf)
+		if err != nil && err != io.EOF {
+			log.WithError(err).Errorf("There were some errors writing to the file during download.")
+			return err
+		}
+		if size == 0 {
+			break
+		}
+		file.Write(buf[:size])
+		totalSize += size
+		if totalSize-totalSizeBak > 5*1024*1024 {
+			totalSizeBak = totalSize
+			log.Infof("the current download size is %d MB", totalSize/1024/1024)
+			elapsedTime := time.Since(lastTime)
+			expectedTime := time.Second
+			if expectedTime > elapsedTime {
+				sleepTime := expectedTime - elapsedTime
+				if sleepTime > 0 {
+					time.Sleep(sleepTime)
+				}
+			}
+			lastTime = time.Now()
+		}
+	}
+	log.Infof("the current download size is %d MB", totalSize/1024/1024)
+	return nil
+}

+ 368 - 0
utils/worker/serverWorker.go

@@ -0,0 +1,368 @@
+package worker
+
+import (
+	"encoding/json"
+	"fmt"
+	. "github.com/coroot/coroot-node-agent/utils/modelse"
+	log "github.com/sirupsen/logrus"
+	"net/http"
+	"sync"
+	//
+	//log "github.com/sirupsen/logrus"
+)
+
+const (
+	registerModel    = 0
+	connectTestModel = iota
+	syncAgentInfoModel
+	heartModel
+	receiveTaskModel
+	reportTaskResultModel
+	installReportModel
+
+	urlModel  = "%s/api/ext/gaia/daemon/%s"
+	urlModel_ = "%s/api/ext/gaia/daemon/%s/%s"
+)
+
+type ServerWorker interface {
+	//InstallReport(r ReportRequest) error
+
+	RegisterApp(RegisterAppReq) error
+
+	WhiteList(WhiteListReq) (WhiteData, error)
+
+	//SyncAgentInfo(RegistRequest) ([]AgentList, error)
+
+	//Heart(DaemonHeartBeatInfo) (map[string]HeartConfContent, error)
+
+	//ReceiveTask([]interface{}) ([]Task, error)
+
+	//ReportTaskResult([]TaskResult) ([]byte, error)
+
+	//SetToken(token string)
+
+	//GetInfo() (string, string)
+}
+
+func (w *ServerHTTPWorker) RegisterApp(request RegisterAppReq) error {
+	log.Infof("RegisterApp request:%v.", request)
+	_, err := w.requestServer("/v2/app/create", request)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (w *ServerHTTPWorker) WhiteList(request WhiteListReq) (WhiteData, error) {
+	log.Infof("WhiteList request:%v.", request)
+	response := WhiteData{}
+
+	//log.Infof("WhiteList request:%s.", string(data))
+	result, err := w.requestServer("/api/v1/agent/whitelist", request)
+	if err != nil {
+		return response, err
+	}
+
+	err = json.Unmarshal(result, &response)
+	//fmt.Println(string(result))
+	if err != nil {
+		log.Errorf("WhiteList unmarshal response error:%s.", err)
+		return response, err
+	}
+	return response, nil
+}
+
+type ServerHTTPWorker struct {
+	currentIndex    int // 标识当前使用的是那个链接方式
+	countTotal      int // 在重试链接server时,统计重试的次数,用来是否遍历了一轮
+	mux             *sync.Mutex
+	accountID       int
+	hostID          string
+	registUrl       string
+	installReport   string
+	heartUrl        string
+	taskUrl         string
+	returnTaskUrl   string
+	installAgentUrl string
+	token           string
+	connectTestUrl  string
+	doccServer      string
+	daemonId        string
+	connectList     []string
+	tokenList       []string
+	proxyClient     *Client
+}
+
+func NewServerHTTPWorker() (*ServerHTTPWorker, error) {
+	s := &ServerHTTPWorker{
+		accountID: 1,
+		//daemonId:     daemonId,
+		//currentIndex: -1,
+		//connectList:  strings.Split(connect, ","),
+		//tokenList:    strings.Split(token, ","),
+		mux: new(sync.Mutex),
+	}
+	s.proxyClient, _ = GetProxyClient()
+	return s, nil
+}
+
+//	func (w *serverHTTPWorker) updateUrl(index int) int {
+//		w.mux.Lock()
+//		defer w.mux.Unlock()
+//		// if index != w.currentIndex {
+//		// return w.currentIndex
+//		// }
+//		w.currentIndex = (w.currentIndex + 1) % len(w.connectList)
+//		connect := ""
+//		length := len(w.tokenList)
+//		if length > 0 {
+//			w.token = w.tokenList[0]
+//		}
+//
+//		// if !strings.HasSuffix(connect, "http") {
+//		//	connect = fmt.Sprintf("http://%s", connect)
+//		// }
+//
+//		w.registUrl = fmt.Sprintf(urlModel, connect, "regist")
+//		w.heartUrl = fmt.Sprintf(urlModel, connect, "heartbeat")
+//		w.taskUrl = fmt.Sprintf(urlModel_, connect, "fetchOneagent", w.daemonId)
+//		w.returnTaskUrl = fmt.Sprintf(urlModel_, connect, "report", w.daemonId)
+//		w.installAgentUrl = fmt.Sprintf(urlModel, connect, "installAgents")
+//		w.connectTestUrl = fmt.Sprintf(urlModel, connect, "ifConnectTest")
+//		w.installReport = fmt.Sprintf(urlModel, connect, "installreport")
+//		log.Debugf("registerUrl:%s, heartUrl:%s, taskUrl:%s, returnTaskUrl:%s, installAgentUrl:%s, connectTest:%s,  installReport: %s",
+//			w.registUrl, w.heartUrl, w.taskUrl, w.returnTaskUrl, w.installAgentUrl, w.connectTestUrl, w.installReport)
+//		return w.currentIndex
+//	}
+//
+//	func (w *serverHTTPWorker) Register(r RegistRequest) (RegistorRespData, error) {
+//		response := RegistorRespData{}
+//		data, err := json.Marshal(r)
+//		if err != nil {
+//			log.Errorf("register marshal request error:%s.", err)
+//			return response, err
+//		}
+//		log.Infof("register request %s", string(data))
+//		result, err := w.requestServer(registerModel, data)
+//		if err != nil {
+//			return response, err
+//		}
+//
+//		err = json.Unmarshal(result, &response)
+//		if err != nil {
+//			log.Errorf("register unmarshal response error:%s.", err)
+//			return response, err
+//		}
+//
+//		return response, nil
+//	}
+//
+//	func (w *serverHTTPWorker) InstallReport(r ReportRequest) error {
+//		data, err := json.Marshal(r)
+//		if err != nil {
+//			log.Errorf("install report marshal request error:%s.", err)
+//			return err
+//		}
+//		log.Infof("install report request %s", string(data))
+//		if _, err := w.requestServer(installReportModel, data); err != nil {
+//			return err
+//		}
+//		return nil
+//	}
+//
+//	func (w *serverHTTPWorker) ConnectTest(r RegistRequest) error {
+//		data, err := json.Marshal(r)
+//		if err != nil {
+//			log.Errorf("ConnectTest serialize ConnectTest request error:%s.", err)
+//			return err
+//		}
+//		log.Infof("ConnectTest request:%s.", string(data))
+//		_, err = w.requestServer(connectTestModel, data)
+//		return err
+//	}
+//
+//	func (w *serverHTTPWorker) SyncAgentInfo(r RegistRequest) ([]AgentList, error) {
+//		response := RegistorRespData{}
+//		data, err := json.Marshal(r)
+//		if err != nil {
+//			log.Errorf("sync agent runner serialize error:%s.", err)
+//			return response.Agents, err
+//		}
+//		log.Infof("sync agent runner request:%s.", string(data))
+//		result, err := w.requestServer(syncAgentInfoModel, data)
+//		if err != nil {
+//			return response.Agents, err
+//		}
+//
+//		err = json.Unmarshal(result, &response)
+//		if err != nil {
+//			log.Errorf("sync agent runner parser error return:%s.", err)
+//			return response.Agents, err
+//		}
+//
+//		return response.Agents, nil
+//	}
+//
+//	func (w *serverHTTPWorker) Heart(request DaemonHeartBeatInfo) (map[string]HeartConfContent, error) {
+//		log.Infof("heartbeat request:%v.", request)
+//		response := HB_Config{}
+//		// Mac本上: P_resource_occupancy[0].cpu -> +Inf  ->  json: unsupported value: +Inf
+//		data, err := json.Marshal(request)
+//		if err != nil {
+//			log.Errorf("heartbeat marshal request error:%s.", err)
+//			return response.HB_Config_Detail, err
+//		}
+//
+//		log.Infof("heartbeat request:%s.", string(data))
+//		result, err := w.requestServer(heartModel, data)
+//		if err != nil {
+//			return response.HB_Config_Detail, err
+//		}
+//
+//		err = json.Unmarshal(result, &response)
+//		if err != nil {
+//			log.Errorf("heartbeat unmarshal response error:%s.", err)
+//			return response.HB_Config_Detail, err
+//		}
+//		return response.HB_Config_Detail, nil
+//	}
+//
+//	func (w *serverHTTPWorker) ReceiveTask(request []interface{}) (tasks []Task, err error) {
+//		defer func() {
+//			if err := recover(); err != nil {
+//				log.Errorf("get task panic, error is <%v>", err)
+//			}
+//		}()
+//		data, err := json.Marshal(request)
+//		if err != nil {
+//			log.Errorf("get task serialize get task request, error is <%s>", err)
+//			return nil, err
+//		}
+//
+//		log.Infof("get task request:%s.", string(data))
+//
+//		result := RespJson{}
+//		// index := w.getIndex()
+//		// for i := 0; i <= 0; i++ {
+//		url := w.taskUrl
+//		// fmt.Println("ReceiveTask:", url)
+//		response, err := w.proxyClient.ProxyRequest(http.MethodPost, url, data,
+//			map[string]string{"token": w.token, "Content-Type": "application/json"})
+//		// response, err := HTTPRequest(http.MethodPost, url, data, w.token)
+//		if err != nil {
+//			log.WithError(err).Errorf("proxy server error")
+//			return nil, fmt.Errorf("proxy server <%s> error is <%v>", url, err)
+//		}
+//		log.Infof("url is < %s >, tasks response <%s>", url, string(response))
+//
+//		err = json.Unmarshal(response, &result)
+//		if err != nil {
+//			log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
+//				url, err, string(response))
+//			return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>",
+//				url, err, string(response))
+//
+//		}
+//
+//		if result.Code != 100000 {
+//			log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
+//				url, result.Code, result.Msg)
+//			return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>",
+//				url, result.Code, result.Msg)
+//		}
+//
+//		if err != nil {
+//			log.Errorf("failed with request task, error is <%v>", err)
+//			return nil, err
+//		}
+//
+//		if result.Encryption && config.Sm2PrivateKey != nil && result.Data != nil {
+//			base64Data := bytes.TrimSuffix(result.Data, []byte("\""))
+//			base64Data = bytes.TrimPrefix(base64Data, []byte("\""))
+//
+//			decodeData, _ := base64.StdEncoding.DecodeString(string(base64Data))
+//			plainText, err := sm2.Decrypt(config.Sm2PrivateKey, decodeData, sm2.C1C3C2)
+//			if err != nil {
+//				log.Errorf("failed with decrypt asn1 from pem, data is <%s>, error is <%v>", string(result.Data), err)
+//				return nil, err
+//			}
+//
+//			log.Infof("task info is <%s>", string(plainText))
+//			err = json.Unmarshal(plainText, &tasks)
+//			if err != nil {
+//				log.Errorf("failed with json unmarshal, task info is <%s>,  error is <%v>", string(plainText), err)
+//				return nil, err
+//			}
+//
+//			return tasks, nil
+//		}
+//
+//		err = json.Unmarshal(result.Data, &tasks)
+//		if err != nil {
+//			log.Errorf("failed with json unmarshal, error is <%v>", err)
+//			return nil, err
+//		}
+//		return tasks, nil
+//	}
+//
+//	func (w *serverHTTPWorker) ReportTaskResult(taskResult []TaskResult) ([]byte, error) {
+//		data, err := json.Marshal(taskResult)
+//		if err != nil {
+//			log.Errorf("report task result serialize error:%s.", err)
+//			return nil, err
+//		}
+//		log.Infof("report task result request:%s.", string(data))
+//
+//		data, err = w.requestServer(reportTaskResultModel, data)
+//		return data, err
+//	}
+//
+//	func (w *serverHTTPWorker) getIndex() int {
+//		w.mux.Lock()
+//		defer w.mux.Unlock()
+//		return w.currentIndex
+//	}
+func (w *ServerHTTPWorker) requestServer(uri string, data interface{}) ([]byte, error) {
+	byteData, err := json.Marshal(data)
+	if err != nil {
+		log.Errorf("heartbeat marshal request error:%s.", err)
+		return byteData, err
+	}
+
+	response, err := w.proxyClient.ProxyRequest(http.MethodPost, uri, byteData,
+		map[string]string{"Content-Type": "application/json"})
+	if err != nil {
+		log.WithError(err).Errorf("proxy server error")
+		// index = w.updateUrl(index)
+		return nil, fmt.Errorf("proxy server %s error is <%v>", uri, err)
+	}
+
+	log.Infof("url %s response %s", uri, string(response))
+
+	result := RespJson{}
+	err = json.Unmarshal(response, &result)
+	if err != nil {
+		log.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err, string(response))
+		// index = w.updateUrl(index)
+		return nil, fmt.Errorf("failed with unmarshal server <%s>, result is <%s>, error is <%v>", uri, err,
+			string(response))
+	}
+
+	if result.Code != 1000 {
+		// index = w.updateUrl(index)
+		log.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code, result.Msg)
+		return nil, fmt.Errorf("failed with request server <%s>, code is <%d>, error is <%v>", uri, result.Code,
+			result.Msg)
+	}
+
+	return result.Data, nil
+}
+
+//
+//func (w *serverHTTPWorker) SetToken(token string) {
+//	w.token = token
+//}
+//
+//func (w *serverHTTPWorker) GetInfo() (string, string) {
+//	return w.doccServer, w.token
+//}