|
|
@@ -4,13 +4,13 @@ import (
|
|
|
"bufio"
|
|
|
"debug/elf"
|
|
|
"fmt"
|
|
|
- "github.com/coroot/coroot-node-agent/common"
|
|
|
"github.com/coroot/coroot-node-agent/ebpftracer"
|
|
|
"github.com/coroot/coroot-node-agent/ebpftracer/l7"
|
|
|
"github.com/coroot/coroot-node-agent/ebpftracer/tracer"
|
|
|
"github.com/coroot/coroot-node-agent/proc"
|
|
|
"github.com/coroot/coroot-node-agent/tracing"
|
|
|
"github.com/coroot/coroot-node-agent/utils"
|
|
|
+ . "github.com/coroot/coroot-node-agent/utils/modelse"
|
|
|
"github.com/pkg/errors"
|
|
|
klog "github.com/sirupsen/logrus"
|
|
|
"inet.af/netaddr"
|
|
|
@@ -21,6 +21,8 @@ import (
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
+const TRACE_STATUS = 1
|
|
|
+
|
|
|
func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
|
|
|
trace, ok := c.traceMap[traceId]
|
|
|
return trace, ok
|
|
|
@@ -86,12 +88,12 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
return c.onDNSRequest(r)
|
|
|
}
|
|
|
|
|
|
- if !c.valuableTrace(r.TraceId) {
|
|
|
- return nil
|
|
|
- }
|
|
|
+ //if !c.valuableTrace(r.TraceId) {
|
|
|
+ // return nil
|
|
|
+ //}
|
|
|
|
|
|
- if r.Protocol == l7.ProtocolTrace {
|
|
|
- if r.TraceStart == 1 {
|
|
|
+ if r.Protocol == l7.ProtocolTrace && c.l7Attach && c.valuableTrace(r.TraceId) {
|
|
|
+ if r.TraceStart == TRACE_STATUS {
|
|
|
klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
|
|
|
trace, err := c.getOrInitTrace(r.TraceId)
|
|
|
if err == nil {
|
|
|
@@ -104,7 +106,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
- if r.TraceEnd == 1 {
|
|
|
+ if r.TraceEnd == TRACE_STATUS {
|
|
|
klog.Infof("====ProtocolTrace end==== %d %d", pid, r.TraceId)
|
|
|
trace, err := c.getOrInitTrace(r.TraceId)
|
|
|
if err == nil {
|
|
|
@@ -115,22 +117,16 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
}
|
|
|
}
|
|
|
if r.Protocol == l7.ProtocolHTTP {
|
|
|
- //stats.observe(r.Status.Http(), "", r.Duration)
|
|
|
- method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
|
|
|
- //trace.HttpRequest(method, path, r.Status, r.Duration)
|
|
|
-
|
|
|
- //apmTrace, ok := c.getTrace(r.TraceId)
|
|
|
- //if ok {
|
|
|
- // apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
|
|
|
- //}
|
|
|
-
|
|
|
- apmTrace, err := c.getOrInitTrace(r.TraceId)
|
|
|
- fmt.Println("ProtocolHTTP-----", r.TraceId, err)
|
|
|
- if err == nil {
|
|
|
- apmTrace.HttpTraceRequestEvent(method, path, hostIp, port, r)
|
|
|
- c.SendEvent(apmTrace, r.TraceId)
|
|
|
+ if c.l7Attach && c.valuableTrace(r.TraceId) {
|
|
|
+ method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
|
|
|
+ apmTrace, err := c.getOrInitTrace(r.TraceId)
|
|
|
+ //fmt.Println("ProtocolHTTP-----", r.TraceId, err)
|
|
|
+ if err == nil {
|
|
|
+ apmTrace.HttpTraceRequestEvent(method, path, hostIp, port, r)
|
|
|
+ c.SendEvent(apmTrace, r.TraceId)
|
|
|
+ }
|
|
|
}
|
|
|
- return nil
|
|
|
+ //return nil
|
|
|
}
|
|
|
conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
|
|
|
//fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
|
|
|
@@ -142,19 +138,10 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
return nil
|
|
|
}
|
|
|
stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
|
|
|
- trace := tracing.NewTrace(string(c.id), conn.ActualDest)
|
|
|
+ //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
|
|
|
switch r.Protocol {
|
|
|
case l7.ProtocolHTTP:
|
|
|
- //fmt.Println("l7.ProtocolHTTP", r.TraceId)
|
|
|
- ////stats.observe(r.Status.Http(), "", r.Duration)
|
|
|
- //method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
|
|
|
- ////trace.HttpRequest(method, path, r.Status, r.Duration)
|
|
|
- //
|
|
|
- //apmTrace, ok := c.getTrace(r.TraceId)
|
|
|
- //if ok {
|
|
|
- // apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
|
|
|
- //}
|
|
|
-
|
|
|
+ stats.observe(r.Status.Http(), "", r.Duration)
|
|
|
case l7.ProtocolHTTP2:
|
|
|
if conn.http2Parser == nil {
|
|
|
conn.http2Parser = l7.NewHttp2Parser()
|
|
|
@@ -162,64 +149,80 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
|
|
|
for _, req := range requests {
|
|
|
stats.observe(req.Status.Http(), "", req.Duration)
|
|
|
- trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
|
|
|
+ //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
|
|
|
}
|
|
|
case l7.ProtocolPostgres:
|
|
|
- //if r.Method != l7.MethodStatementClose {
|
|
|
- // stats.observe(r.Status.String(), "", r.Duration)
|
|
|
- //}
|
|
|
+ if r.Method != l7.MethodStatementClose {
|
|
|
+ stats.observe(r.Status.String(), "", r.Duration)
|
|
|
+ }
|
|
|
//if conn.postgresParser == nil {
|
|
|
// conn.postgresParser = l7.NewPostgresParser()
|
|
|
//}
|
|
|
//query := conn.postgresParser.Parse(r.Payload)
|
|
|
//trace.PostgresQuery(query, r.Status.Error(), r.Duration)
|
|
|
case l7.ProtocolMysql:
|
|
|
- //fmt.Println("mysql mysql")
|
|
|
- //fmt.Println(conn)
|
|
|
if r.Method != l7.MethodStatementClose {
|
|
|
stats.observe(r.Status.String(), "", r.Duration)
|
|
|
}
|
|
|
- if conn.mysqlParser == nil {
|
|
|
- conn.mysqlParser = l7.NewMysqlParser()
|
|
|
- }
|
|
|
- query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
|
|
|
- //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
|
|
|
-
|
|
|
- //apmTrace, ok := c.getTrace(r.TraceId)
|
|
|
- apmTrace, err := c.getOrInitTrace(r.TraceId)
|
|
|
- //fmt.Println("mysql r.TraceId:", r.TraceId)
|
|
|
- //fmt.Println("ok:", ok)
|
|
|
- //fmt.Println("traceMap:", len(c.traceMap))
|
|
|
- if err == nil {
|
|
|
- //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
|
|
|
- apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
|
|
|
- c.SendEvent(apmTrace, r.TraceId)
|
|
|
+ if c.l7Attach && c.valuableTrace(r.TraceId) {
|
|
|
+ if conn.mysqlParser == nil {
|
|
|
+ conn.mysqlParser = l7.NewMysqlParser()
|
|
|
+ }
|
|
|
+ query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
|
|
|
+ //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
|
|
|
+
|
|
|
+ //apmTrace, ok := c.getTrace(r.TraceId)
|
|
|
+ apmTrace, err := c.getOrInitTrace(r.TraceId)
|
|
|
+ //fmt.Println("mysql r.TraceId:", r.TraceId)
|
|
|
+ //fmt.Println("ok:", ok)
|
|
|
+ //fmt.Println("traceMap:", len(c.traceMap))
|
|
|
+ if err == nil {
|
|
|
+ //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
|
|
|
+ apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
|
|
|
+ c.SendEvent(apmTrace, r.TraceId)
|
|
|
+ }
|
|
|
}
|
|
|
case l7.ProtocolMemcached:
|
|
|
- //stats.observe(r.Status.String(), "", r.Duration)
|
|
|
+ stats.observe(r.Status.String(), "", r.Duration)
|
|
|
+ if c.l7Attach && c.valuableTrace(r.TraceId) {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
//cmd, items := l7.ParseMemcached(r.Payload)
|
|
|
//trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
|
|
|
case l7.ProtocolRedis:
|
|
|
stats.observe(r.Status.String(), "", r.Duration)
|
|
|
- cmd, args := l7.ParseRedis(r.Payload)
|
|
|
- fmt.Println("cmd", cmd)
|
|
|
- fmt.Println("args", args)
|
|
|
- //apmTrace, ok := c.getTrace(r.TraceId)
|
|
|
- apmTrace, err := c.getOrInitTrace(r.TraceId)
|
|
|
- if err == nil {
|
|
|
- //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
|
|
|
- apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
|
|
|
- c.SendEvent(apmTrace, r.TraceId)
|
|
|
+ if c.l7Attach && c.valuableTrace(r.TraceId) {
|
|
|
+ cmd, args := l7.ParseRedis(r.Payload)
|
|
|
+ //fmt.Println("cmd", cmd)
|
|
|
+ //fmt.Println("args", args)
|
|
|
+ //apmTrace, ok := c.getTrace(r.TraceId)
|
|
|
+ apmTrace, err := c.getOrInitTrace(r.TraceId)
|
|
|
+ if err == nil {
|
|
|
+ //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
|
|
|
+ apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
|
|
|
+ c.SendEvent(apmTrace, r.TraceId)
|
|
|
+ }
|
|
|
}
|
|
|
//trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
|
|
|
case l7.ProtocolMongo:
|
|
|
- //stats.observe(r.Status.String(), "", r.Duration)
|
|
|
+ stats.observe(r.Status.String(), "", r.Duration)
|
|
|
+ if c.l7Attach && c.valuableTrace(r.TraceId) {
|
|
|
+ }
|
|
|
//query := l7.ParseMongo(r.Payload)
|
|
|
//trace.MongoQuery(query, r.Status.Error(), r.Duration)
|
|
|
case l7.ProtocolKafka, l7.ProtocolCassandra:
|
|
|
- //stats.observe(r.Status.String(), "", r.Duration)
|
|
|
+ stats.observe(r.Status.String(), "", r.Duration)
|
|
|
+ if c.l7Attach && c.valuableTrace(r.TraceId) {
|
|
|
+ }
|
|
|
case l7.ProtocolRabbitmq, l7.ProtocolNats:
|
|
|
- //stats.observe(r.Status.String(), r.Method.String(), 0)
|
|
|
+ stats.observe(r.Status.String(), r.Method.String(), 0)
|
|
|
+ if c.l7Attach && c.valuableTrace(r.TraceId) {
|
|
|
+ }
|
|
|
+ case l7.ProtocolDubbo2:
|
|
|
+ stats.observe(r.Status.String(), "", r.Duration)
|
|
|
+ if c.l7Attach && c.valuableTrace(r.TraceId) {
|
|
|
+ }
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
@@ -241,9 +244,9 @@ func (c *Container) buildIDs(pid uint32) bool {
|
|
|
c.AppInfo.Sn = ip.String()
|
|
|
c.AppInfo.Sport = int(port)
|
|
|
strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
|
|
|
- c.instanceID.IntVal, _ = strInstanceID.ToInt64()
|
|
|
- c.instanceID.HashtVal = strInstanceID.ToHashByte()
|
|
|
- c.AppInfo.InstanceId = c.instanceID.IntVal
|
|
|
+ c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
|
|
|
+ c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
|
|
|
+ //c.AppInfo.InstanceId = c.instanceID.IntVal
|
|
|
strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
|
|
|
c.AppInfo.AgentId, _ = strAgentID.ToInt64()
|
|
|
c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
|
|
|
@@ -414,22 +417,24 @@ func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tr
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-func (c *Container) GetAppInfo() common.AppInfo {
|
|
|
+func (c *Container) GetAppInfo() AppInfo {
|
|
|
return c.AppInfo
|
|
|
}
|
|
|
|
|
|
-func (c *Container) eventReady() {
|
|
|
+// 可注入前置
|
|
|
+func (c *Container) checkEventReady() bool {
|
|
|
c.lock.Lock()
|
|
|
defer c.lock.Unlock()
|
|
|
- c.l7EventReady = true
|
|
|
+ return c.l7EventReady
|
|
|
}
|
|
|
|
|
|
-func (c *Container) checkEventReady() bool {
|
|
|
+func (c *Container) eventReady() {
|
|
|
c.lock.Lock()
|
|
|
defer c.lock.Unlock()
|
|
|
- return c.l7EventReady
|
|
|
+ c.l7EventReady = true
|
|
|
}
|
|
|
|
|
|
+// uprobe前置
|
|
|
func (c *Container) checkL7AttachReady() bool {
|
|
|
c.lock.Lock()
|
|
|
defer c.lock.Unlock()
|
|
|
@@ -437,6 +442,8 @@ func (c *Container) checkL7AttachReady() bool {
|
|
|
}
|
|
|
|
|
|
func (c *Container) l7AttachSuccess() {
|
|
|
+ c.lock.Lock()
|
|
|
+ defer c.lock.Unlock()
|
|
|
c.l7Attach = true
|
|
|
}
|
|
|
|
|
|
@@ -467,7 +474,6 @@ func (c *Container) verifyAttachConditions(r *Registry, pid uint32) bool {
|
|
|
klog.WithField("pid", pid).
|
|
|
WithField("ruleVal", ruleVal).
|
|
|
WithField("cmdline", cmdline).
|
|
|
- WithField("event ready", c.checkEventReady()).
|
|
|
Infoln("[verify] check successful.")
|
|
|
return true
|
|
|
}
|
|
|
@@ -482,9 +488,10 @@ func (c *Container) detachUprobes(pid uint32) {
|
|
|
// close uprobe
|
|
|
if p := c.processes[pid]; p != nil {
|
|
|
if len(p.uprobes) > 0 {
|
|
|
- fmt.Println("卸载---", pid)
|
|
|
+ klog.Infof("detachUprobes", pid)
|
|
|
p.DynamicClose()
|
|
|
c.l7Attach = false
|
|
|
+ c.AppInfo = AppInfo{}
|
|
|
}
|
|
|
}
|
|
|
}
|