| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- package containers
- import (
- "fmt"
- "github.com/coroot/coroot-node-agent/ebpftracer/l7"
- "github.com/coroot/coroot-node-agent/tracing"
- "inet.af/netaddr"
- )
- func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
- trace, ok := c.traceMap[traceId]
- return trace, ok
- }
- func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
- method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
- ip, err := netaddr.ParseIP(hostIp)
- if err != nil {
- return fmt.Errorf("host ip error")
- }
- addr := netaddr.IPPortFrom(ip, port)
- trace := tracing.NewTrace(string(c.id), addr)
- if trace == nil {
- return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
- }
- c.traceMap[traceId] = trace
- trace.TraceStart(method, path, r.Status, r.Duration)
- return nil
- }
- func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) {
- c.lock.Lock()
- defer c.lock.Unlock()
- if r.Protocol == l7.ProtocolTrace {
- //fmt.Println("r.TraceStart:", r.TraceStart)
- //fmt.Println("r.TraceEnd:", r.TraceEnd)
- if r.TraceStart == 1 {
- //fmt.Println("====ProtocolTrace start1====", r.TraceId)
- err := c.InitTrace(r.TraceId, r)
- if err != nil {
- fmt.Println(err)
- }
- //fmt.Println("init r.TraceId:", r.TraceId)
- //trace, _ := c.getTrace(r.TraceId)
- //fmt.Println("init traceId", trace)
- //stats.observe(r.Status.Http(), "", r.Duration)
- //method, path := l7.ParseHttp(r.Payload)
- //fmt.Println("r.Payload:", string(r.Payload))
- //fmt.Println("method:", method)
- //fmt.Println("path:", path)
- //fmt.Println("====ProtocolTrace start2====")
- return
- }
- if r.TraceEnd == 1 {
- //fmt.Println("r:", r)
- //fmt.Println("r.Payload:", string(r.Payload))
- //fmt.Println("====ProtocolTrace end2====")
- trace, ok := c.getTrace(r.TraceId)
- if ok {
- trace.TraceEnd(r)
- delete(c.traceMap, r.TraceId)
- }
- //fmt.Println("====ProtocolTrace end1====", ok, r.TraceId)
- return
- }
- }
- conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
- //fmt.Println(conn, pid, fd)
- if conn == nil {
- return
- }
- if timestamp != 0 && conn.Timestamp != timestamp {
- return
- }
- stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
- trace := tracing.NewTrace(string(c.id), conn.ActualDest)
- switch r.Protocol {
- case l7.ProtocolHTTP:
- stats.observe(r.Status.Http(), "", r.Duration)
- method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
- //trace.HttpRequest(method, path, r.Status, r.Duration)
- apmTrace, ok := c.getTrace(r.TraceId)
- if ok {
- apmTrace.HttpTraceRequest(method, path, hostIp, port, r.Status, r.Duration)
- }
- case l7.ProtocolHTTP2:
- if conn.http2Parser == nil {
- conn.http2Parser = l7.NewHttp2Parser()
- }
- requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
- for _, req := range requests {
- stats.observe(req.Status.Http(), "", req.Duration)
- trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
- }
- case l7.ProtocolPostgres:
- if r.Method != l7.MethodStatementClose {
- stats.observe(r.Status.String(), "", r.Duration)
- }
- if conn.postgresParser == nil {
- conn.postgresParser = l7.NewPostgresParser()
- }
- query := conn.postgresParser.Parse(r.Payload)
- trace.PostgresQuery(query, r.Status.Error(), r.Duration)
- case l7.ProtocolMysql:
- //fmt.Println("mysql mysql")
- //fmt.Println(conn)
- if r.Method != l7.MethodStatementClose {
- stats.observe(r.Status.String(), "", r.Duration)
- }
- if conn.mysqlParser == nil {
- conn.mysqlParser = l7.NewMysqlParser()
- }
- query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
- //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
- apmTrace, ok := c.getTrace(r.TraceId)
- //fmt.Println("mysql r.TraceId:", r.TraceId)
- //fmt.Println("ok:", ok)
- //fmt.Println("traceMap:", len(c.traceMap))
- if ok {
- apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
- }
- case l7.ProtocolMemcached:
- stats.observe(r.Status.String(), "", r.Duration)
- cmd, items := l7.ParseMemcached(r.Payload)
- trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
- case l7.ProtocolRedis:
- fmt.Println("redis redis")
- stats.observe(r.Status.String(), "", r.Duration)
- cmd, args := l7.ParseRedis(r.Payload)
- fmt.Println("cmd", cmd)
- fmt.Println("args", args)
- apmTrace, ok := c.getTrace(r.TraceId)
- fmt.Println("redis r.TraceId:", r.TraceId)
- fmt.Println("ok:", ok)
- fmt.Println("traceMap:", len(c.traceMap))
- if ok {
- apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
- }
- //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
- case l7.ProtocolMongo:
- stats.observe(r.Status.String(), "", r.Duration)
- query := l7.ParseMongo(r.Payload)
- trace.MongoQuery(query, r.Status.Error(), r.Duration)
- case l7.ProtocolKafka, l7.ProtocolCassandra:
- stats.observe(r.Status.String(), "", r.Duration)
- case l7.ProtocolRabbitmq, l7.ProtocolNats:
- stats.observe(r.Status.String(), r.Method.String(), 0)
- }
- }
|