|
|
@@ -14,6 +14,7 @@ import (
|
|
|
"go.opentelemetry.io/otel/trace"
|
|
|
"inet.af/netaddr"
|
|
|
"strconv"
|
|
|
+ "strings"
|
|
|
"sync/atomic"
|
|
|
"time"
|
|
|
)
|
|
|
@@ -281,6 +282,10 @@ func (t *Trace) GetSpan() trace.Span {
|
|
|
|
|
|
func (t *Trace) createTraceEvent(name string, eventType int, l7Type int, attrs ...attribute.KeyValue) {
|
|
|
t.span.AddEventApm(name, eventType, l7Type, trace.WithAttributes(attrs...))
|
|
|
+ //atomic.AddUint32(t.currenEventCount, 1)
|
|
|
+}
|
|
|
+
|
|
|
+func (t *Trace) addEvent() {
|
|
|
atomic.AddUint32(t.currenEventCount, 1)
|
|
|
}
|
|
|
|
|
|
@@ -309,71 +314,29 @@ func (t *Trace) MysqlTraceQuery(query string, error bool, duration time.Duration
|
|
|
)
|
|
|
}
|
|
|
|
|
|
-func (t *Trace) MysqlTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
|
|
|
- fmt.Println("query", query)
|
|
|
- if t == nil {
|
|
|
- return
|
|
|
+func isCURDOperation(q string) bool {
|
|
|
+ if len(q) < 6 {
|
|
|
+ return false
|
|
|
}
|
|
|
-
|
|
|
- var attr []attribute.KeyValue
|
|
|
- attr = append(attr,
|
|
|
- semconv.DBSystemMySQL,
|
|
|
- semconv.DBStatement(query),
|
|
|
- semconv.NetPeerName(destination.IP().String()),
|
|
|
- semconv.NetPeerPort(int(destination.Port())),
|
|
|
- attribute.String("sql.src_addr", r.ComponentSAddr.String()),
|
|
|
- attribute.String("sql.destination_addr", r.ComponentDAddr.String()),
|
|
|
- )
|
|
|
- t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
|
|
|
-
|
|
|
- attr = append(attr,
|
|
|
- attribute.Bool("sql.exception", r.Status.Error()),
|
|
|
- )
|
|
|
- t.createTraceEvent(l7.ProtocolMysql.String(), int(ebpftracer.EventTypeL7Request), int(l7.ProtocolMysql), attr...)
|
|
|
+ q = strings.ToUpper(q[:6])
|
|
|
+ return q == "SELECT" || q == "INSERT" || q == "UPDATE" || q == "DELETE"
|
|
|
}
|
|
|
|
|
|
-func (t *Trace) PostGreSqlTraceQuery(query string, error bool, duration time.Duration, destination netaddr.IPPort) {
|
|
|
- if t == nil || query == "" {
|
|
|
- return
|
|
|
- }
|
|
|
- t.createTraceSpan(l7.ProtocolPostgres.String(), duration, error,
|
|
|
- semconv.DBSystemPostgreSQL,
|
|
|
- semconv.DBStatement(query),
|
|
|
- semconv.NetPeerName(destination.IP().String()),
|
|
|
- semconv.NetPeerPort(int(destination.Port())),
|
|
|
- )
|
|
|
-}
|
|
|
-
|
|
|
-func (t *Trace) PostGreSqlTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
|
|
|
+func (t *Trace) SQLTraceQueryEvent(l7Type l7.Protocol, semconvVal attribute.KeyValue, query string, r *l7.RequestData, destination netaddr.IPPort) {
|
|
|
fmt.Println("query", query)
|
|
|
if t == nil {
|
|
|
return
|
|
|
}
|
|
|
+ t.addEvent()
|
|
|
|
|
|
- var attr []attribute.KeyValue
|
|
|
- attr = append(attr,
|
|
|
- semconv.DBSystemPostgreSQL,
|
|
|
- semconv.DBStatement(query),
|
|
|
- semconv.NetPeerName(destination.IP().String()),
|
|
|
- semconv.NetPeerPort(int(destination.Port())),
|
|
|
- attribute.String("sql.src_addr", r.ComponentSAddr.String()),
|
|
|
- attribute.String("sql.destination_addr", r.ComponentDAddr.String()),
|
|
|
- )
|
|
|
- t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
|
|
|
- attr = append(attr,
|
|
|
- attribute.Bool("sql.exception", r.Status.Error()),
|
|
|
- )
|
|
|
- t.createTraceEvent(l7.ProtocolPostgres.String(), int(ebpftracer.EventTypeL7Request), int(l7.ProtocolPostgres), attr...)
|
|
|
-}
|
|
|
-
|
|
|
-func (t *Trace) DmTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
|
|
|
- if t == nil || query == "" {
|
|
|
+ // 只保留增删改查基本操作
|
|
|
+ if !isCURDOperation(query) {
|
|
|
return
|
|
|
}
|
|
|
- l7Type := int(l7.ProtocolDM)
|
|
|
+
|
|
|
var attr []attribute.KeyValue
|
|
|
attr = append(attr,
|
|
|
- semconv.DBSystemDaMengDB,
|
|
|
+ semconvVal,
|
|
|
semconv.DBStatement(query),
|
|
|
semconv.NetPeerName(destination.IP().String()),
|
|
|
semconv.NetPeerPort(int(destination.Port())),
|
|
|
@@ -385,26 +348,109 @@ func (t *Trace) DmTraceQueryEvent(query string, r *l7.RequestData, destination n
|
|
|
attr = append(attr,
|
|
|
attribute.Bool("sql.exception", r.Status.Error()),
|
|
|
)
|
|
|
- t.createTraceEvent(l7.ProtocolHTTP.String(), int(ebpftracer.EventTypeL7Request), l7Type, attr...)
|
|
|
+ t.createTraceEvent(l7Type.String(), ebpftracer.EventTypeL7Request.Int(), l7Type.Int(), attr...)
|
|
|
}
|
|
|
|
|
|
-func (t *Trace) RedisTraceQuery(cmd, args string, error bool, duration time.Duration) {
|
|
|
- if t == nil || cmd == "" {
|
|
|
- return
|
|
|
- }
|
|
|
- statement := cmd
|
|
|
- if args != "" {
|
|
|
- statement += " " + args
|
|
|
- }
|
|
|
- t.createTraceSpan(l7.ProtocolRedis.String(), duration, error,
|
|
|
- semconv.DBSystemRedis,
|
|
|
- semconv.DBOperation(cmd),
|
|
|
- semconv.DBStatement(statement),
|
|
|
- )
|
|
|
-}
|
|
|
+//func (t *Trace) MysqlTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
|
|
|
+// fmt.Println("query", query)
|
|
|
+// if t == nil {
|
|
|
+// return
|
|
|
+// }
|
|
|
+//
|
|
|
+// var attr []attribute.KeyValue
|
|
|
+// attr = append(attr,
|
|
|
+// semconv.DBSystemMySQL,
|
|
|
+// semconv.DBStatement(query),
|
|
|
+// semconv.NetPeerName(destination.IP().String()),
|
|
|
+// semconv.NetPeerPort(int(destination.Port())),
|
|
|
+// attribute.String("sql.src_addr", r.ComponentSAddr.String()),
|
|
|
+// attribute.String("sql.destination_addr", r.ComponentDAddr.String()),
|
|
|
+// )
|
|
|
+// t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
|
|
|
+//
|
|
|
+// attr = append(attr,
|
|
|
+// attribute.Bool("sql.exception", r.Status.Error()),
|
|
|
+// )
|
|
|
+// t.createTraceEvent(l7.ProtocolMysql.String(), int(ebpftracer.EventTypeL7Request), int(l7.ProtocolMysql), attr...)
|
|
|
+//}
|
|
|
+
|
|
|
+//func (t *Trace) PostGreSqlTraceQuery(query string, error bool, duration time.Duration, destination netaddr.IPPort) {
|
|
|
+// if t == nil || query == "" {
|
|
|
+// return
|
|
|
+// }
|
|
|
+// t.createTraceSpan(l7.ProtocolPostgres.String(), duration, error,
|
|
|
+// semconv.DBSystemPostgreSQL,
|
|
|
+// semconv.DBStatement(query),
|
|
|
+// semconv.NetPeerName(destination.IP().String()),
|
|
|
+// semconv.NetPeerPort(int(destination.Port())),
|
|
|
+// )
|
|
|
+//}
|
|
|
+
|
|
|
+//func (t *Trace) PostGreSqlTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
|
|
|
+// fmt.Println("query", query)
|
|
|
+// if t == nil {
|
|
|
+// return
|
|
|
+// }
|
|
|
+//
|
|
|
+// var attr []attribute.KeyValue
|
|
|
+// attr = append(attr,
|
|
|
+// semconv.DBSystemPostgreSQL,
|
|
|
+// semconv.DBStatement(query),
|
|
|
+// semconv.NetPeerName(destination.IP().String()),
|
|
|
+// semconv.NetPeerPort(int(destination.Port())),
|
|
|
+// attribute.String("sql.src_addr", r.ComponentSAddr.String()),
|
|
|
+// attribute.String("sql.destination_addr", r.ComponentDAddr.String()),
|
|
|
+// )
|
|
|
+// t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
|
|
|
+// attr = append(attr,
|
|
|
+// attribute.Bool("sql.exception", r.Status.Error()),
|
|
|
+// )
|
|
|
+// t.createTraceEvent(l7.ProtocolPostgres.String(), int(ebpftracer.EventTypeL7Request), int(l7.ProtocolPostgres), attr...)
|
|
|
+//}
|
|
|
+
|
|
|
+//func (t *Trace) DmTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
|
|
|
+// if t == nil || query == "" {
|
|
|
+// return
|
|
|
+// }
|
|
|
+// l7Type := int(l7.ProtocolDM)
|
|
|
+// var attr []attribute.KeyValue
|
|
|
+// attr = append(attr,
|
|
|
+// semconv.DBSystemDaMengDB,
|
|
|
+// semconv.DBStatement(query),
|
|
|
+// semconv.NetPeerName(destination.IP().String()),
|
|
|
+// semconv.NetPeerPort(int(destination.Port())),
|
|
|
+// attribute.String("sql.src_addr", r.ComponentSAddr.String()),
|
|
|
+// attribute.String("sql.destination_addr", r.ComponentDAddr.String()),
|
|
|
+// )
|
|
|
+// t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
|
|
|
+//
|
|
|
+// attr = append(attr,
|
|
|
+// attribute.Bool("sql.exception", r.Status.Error()),
|
|
|
+// )
|
|
|
+// t.createTraceEvent(l7.ProtocolHTTP.String(), int(ebpftracer.EventTypeL7Request), l7Type, attr...)
|
|
|
+//}
|
|
|
+
|
|
|
+//func (t *Trace) RedisTraceQuery(cmd, args string, error bool, duration time.Duration) {
|
|
|
+// if t == nil || cmd == "" {
|
|
|
+// return
|
|
|
+// }
|
|
|
+// statement := cmd
|
|
|
+// if args != "" {
|
|
|
+// statement += " " + args
|
|
|
+// }
|
|
|
+// t.createTraceSpan(l7.ProtocolRedis.String(), duration, error,
|
|
|
+// semconv.DBSystemRedis,
|
|
|
+// semconv.DBOperation(cmd),
|
|
|
+// semconv.DBStatement(statement),
|
|
|
+// )
|
|
|
+//}
|
|
|
|
|
|
func (t *Trace) RedisTraceQueryEvent(cmd, args string, r *l7.RequestData, destination netaddr.IPPort) {
|
|
|
- if t == nil || cmd == "" {
|
|
|
+ if t == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ t.addEvent()
|
|
|
+ if cmd == "" {
|
|
|
return
|
|
|
}
|
|
|
statement := cmd
|
|
|
@@ -430,7 +476,7 @@ func (t *Trace) DNSTraceQueryEvent(r *l7.RequestData, _type, fqdn string) {
|
|
|
if t == nil {
|
|
|
return
|
|
|
}
|
|
|
-
|
|
|
+ t.addEvent()
|
|
|
var attr []attribute.KeyValue
|
|
|
attr = append(attr,
|
|
|
attribute.String("dns.type", _type),
|
|
|
@@ -464,7 +510,11 @@ func (t *Trace) HttpTraceRequest(method, path, ip string, port uint16, r *l7.Req
|
|
|
|
|
|
// 新增事件处理
|
|
|
func (t *Trace) HttpTraceRequestEvent(method, path, ip string, port uint16, r *l7.RequestData) {
|
|
|
- if t == nil || method == "" {
|
|
|
+ if t == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ t.addEvent()
|
|
|
+ if method == "" {
|
|
|
return
|
|
|
}
|
|
|
assumedAppID, err := strconv.ParseInt(r.AssumedAppId, 10, 64)
|
|
|
@@ -491,7 +541,11 @@ func (t *Trace) HttpTraceRequestEvent(method, path, ip string, port uint16, r *l
|
|
|
}
|
|
|
|
|
|
func (t *Trace) FuncTraceQuery(funcname string, duration time.Duration, start uint64, end uint64) {
|
|
|
- if t == nil || funcname == "" {
|
|
|
+ if t == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ t.addEvent()
|
|
|
+ if funcname == "" {
|
|
|
return
|
|
|
}
|
|
|
t.createTraceSpanNoTime2(funcname, duration, false, start, end)
|