|
|
@@ -244,12 +244,18 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
|
|
|
//trace := tracing.NewTrace(string(c.id), conn.ActualDest)
|
|
|
switch r.Protocol {
|
|
|
+ /**
|
|
|
+ * HTTP
|
|
|
+ */
|
|
|
case l7.ProtocolHTTP:
|
|
|
if c.AppInfo.AppName != "" {
|
|
|
klog.Debugf("[%s] ->>>>> curl -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, r.Payload)
|
|
|
}
|
|
|
|
|
|
stats.observe(r.Status.Http(), "", r.Duration)
|
|
|
+ /**
|
|
|
+ * HTTP2
|
|
|
+ */
|
|
|
case l7.ProtocolHTTP2:
|
|
|
if conn.http2Parser == nil {
|
|
|
conn.http2Parser = l7.NewHttp2Parser()
|
|
|
@@ -259,6 +265,9 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
stats.observe(req.Status.Http(), "", req.Duration)
|
|
|
//trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
|
|
|
}
|
|
|
+ /**
|
|
|
+ * PostgreSQL
|
|
|
+ */
|
|
|
case l7.ProtocolPostgres:
|
|
|
if r.Method != l7.MethodStatementClose {
|
|
|
stats.observe(r.Status.String(), "", r.Duration)
|
|
|
@@ -275,7 +284,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
query := conn.postgresParser.Parse(r.Payload)
|
|
|
//trace.MysqlQuery(query, r.Status.Error(), r.Duration)
|
|
|
if c.AppInfo.AppName != "" {
|
|
|
- klog.Debugf("[%s] ->>>>> Postgres -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
|
|
|
+ klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
|
|
|
}
|
|
|
//apmTrace, ok := c.getTrace(r.TraceId)
|
|
|
apmTrace, err := c.getOrInitTrace(r.TraceId)
|
|
|
@@ -285,10 +294,13 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
if err == nil {
|
|
|
//apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
|
|
|
//apmTrace.PostGreSqlTraceQueryEvent(query, r, conn.ActualDest)
|
|
|
- apmTrace.SQLTraceQueryEvent(l7.ProtocolPostgres, semconv.DBSystemPostgreSQL, query, r, conn.ActualDest)
|
|
|
+ apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemPostgreSQL, query, r, conn.ActualDest)
|
|
|
c.SendEvent(apmTrace, r.TraceId)
|
|
|
}
|
|
|
}
|
|
|
+ /**
|
|
|
+ * Mysql
|
|
|
+ */
|
|
|
case l7.ProtocolMysql:
|
|
|
if r.Method != l7.MethodStatementClose {
|
|
|
stats.observe(r.Status.String(), "", r.Duration)
|
|
|
@@ -300,7 +312,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
|
|
|
//trace.MysqlQuery(query, r.Status.Error(), r.Duration)
|
|
|
if c.AppInfo.AppName != "" {
|
|
|
- klog.Debugf("[%s] ->>>>> Mysql -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
|
|
|
+ klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
|
|
|
}
|
|
|
//apmTrace, ok := c.getTrace(r.TraceId)
|
|
|
apmTrace, err := c.getOrInitTrace(r.TraceId)
|
|
|
@@ -320,7 +332,9 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
c.SendEvent(apmTrace, r.TraceId)
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+ /**
|
|
|
+ * DM (达梦数据库)
|
|
|
+ */
|
|
|
case l7.ProtocolDM:
|
|
|
//统计dm的query次数
|
|
|
stats.observe(r.Status.String(), "", r.Duration)
|
|
|
@@ -331,24 +345,42 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
}
|
|
|
query := conn.dmParser.Parse(r.Payload, r.StatementId)
|
|
|
if c.AppInfo.AppName != "" {
|
|
|
- klog.Debugf("[%s] ->>>>> Mysql -> %s DMSQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
|
|
|
+ klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
|
|
|
}
|
|
|
|
|
|
apmTrace, err := c.getOrInitTrace(r.TraceId)
|
|
|
if err == nil {
|
|
|
//apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
|
|
|
- apmTrace.SQLTraceQueryEvent(l7.ProtocolDM, semconv.DBSystemDaMengDB, query, r, conn.ActualDest)
|
|
|
+ apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemDaMengDB, query, r, conn.ActualDest)
|
|
|
c.SendEvent(apmTrace, r.TraceId)
|
|
|
}
|
|
|
}
|
|
|
+ /**
|
|
|
+ * Memcached
|
|
|
+ */
|
|
|
case l7.ProtocolMemcached:
|
|
|
stats.observe(r.Status.String(), "", r.Duration)
|
|
|
if c.l7Attach && c.valuableTrace(r.TraceId) {
|
|
|
-
|
|
|
+ cmd, items := l7.ParseMemcached(r.Payload)
|
|
|
+ if c.AppInfo.AppName != "" {
|
|
|
+ klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd+" "+strings.Join(items, " "))
|
|
|
+ }
|
|
|
+ apmTrace, err := c.getOrInitTrace(r.TraceId)
|
|
|
+ if err == nil {
|
|
|
+ statement := cmd
|
|
|
+ if len(items) == 1 {
|
|
|
+ statement += " " + items[0]
|
|
|
+ } else if len(items) > 1 {
|
|
|
+ joined := fmt.Sprintf("[%s]", strings.Join(items, " "))
|
|
|
+ statement += " " + joined
|
|
|
+ }
|
|
|
+ apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMemcached, cmd, statement, r, conn.ActualDest)
|
|
|
+ c.SendEvent(apmTrace, r.TraceId)
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- //cmd, items := l7.ParseMemcached(r.Payload)
|
|
|
- //trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
|
|
|
+ /**
|
|
|
+ * Redis
|
|
|
+ */
|
|
|
case l7.ProtocolRedis:
|
|
|
stats.observe(r.Status.String(), "", r.Duration)
|
|
|
if c.l7Attach && c.valuableTrace(r.TraceId) {
|
|
|
@@ -357,17 +389,23 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
//fmt.Println("args", args)
|
|
|
//apmTrace, ok := c.getTrace(r.TraceId)
|
|
|
if c.AppInfo.AppName != "" {
|
|
|
- klog.Debugf("[%s] ->>>>> Redis -> %s DMSQL:[%s]", c.AppInfo.AppName, conn.ActualDest, cmd)
|
|
|
+ klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd)
|
|
|
}
|
|
|
|
|
|
apmTrace, err := c.getOrInitTrace(r.TraceId)
|
|
|
if err == nil {
|
|
|
- //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
|
|
|
- apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
|
|
|
+ statement := cmd
|
|
|
+ if args != "" {
|
|
|
+ statement += " " + args
|
|
|
+ }
|
|
|
+ apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemRedis, cmd, statement, r, conn.ActualDest)
|
|
|
c.SendEvent(apmTrace, r.TraceId)
|
|
|
}
|
|
|
}
|
|
|
//trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
|
|
|
+ /**
|
|
|
+ * gRPC
|
|
|
+ */
|
|
|
case l7.ProtocolGrpc:
|
|
|
klog.Debugln("enter the l7.ProtocolGrpc")
|
|
|
stats.observe(r.Status.String(), "", r.Duration)
|
|
|
@@ -378,6 +416,9 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
c.SendEvent(apmTrace, r.TraceId)
|
|
|
}
|
|
|
}
|
|
|
+ /**
|
|
|
+ * MongoDB
|
|
|
+ */
|
|
|
case l7.ProtocolMongo:
|
|
|
stats.observe(r.Status.String(), "", r.Duration)
|
|
|
if c.l7Attach && c.valuableTrace(r.TraceId) {
|
|
|
@@ -388,19 +429,28 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
|
|
|
|
|
|
apmTrace, err := c.getOrInitTrace(r.TraceId)
|
|
|
if err == nil {
|
|
|
- //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
|
|
|
- apmTrace.MongoTraceQueryEvent(query, r, conn.ActualDest)
|
|
|
+ // MongoDB query 格式通常是 JSON,如 {"insert":"users"} 或 {"find":"users","filter":{...}}
|
|
|
+ apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMongoDB, "", query, r, conn.ActualDest)
|
|
|
c.SendEvent(apmTrace, r.TraceId)
|
|
|
}
|
|
|
}
|
|
|
+ /**
|
|
|
+ * Kafka / Cassandra
|
|
|
+ */
|
|
|
case l7.ProtocolKafka, l7.ProtocolCassandra:
|
|
|
stats.observe(r.Status.String(), "", r.Duration)
|
|
|
if c.l7Attach && c.valuableTrace(r.TraceId) {
|
|
|
}
|
|
|
+ /**
|
|
|
+ * RabbitMQ / NATS
|
|
|
+ */
|
|
|
case l7.ProtocolRabbitmq, l7.ProtocolNats:
|
|
|
stats.observe(r.Status.String(), r.Method.String(), 0)
|
|
|
if c.l7Attach && c.valuableTrace(r.TraceId) {
|
|
|
}
|
|
|
+ /**
|
|
|
+ * Dubbo2
|
|
|
+ */
|
|
|
case l7.ProtocolDubbo2:
|
|
|
stats.observe(r.Status.String(), "", r.Duration)
|
|
|
if c.l7Attach && c.valuableTrace(r.TraceId) {
|