| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094 |
- package containers
- import (
- "bufio"
- "bytes"
- "debug/elf"
- "fmt"
- "net"
- "os"
- "path"
- "sort"
- "strconv"
- "strings"
- "time"
- "github.com/cilium/ebpf/link"
- "github.com/coroot/coroot-node-agent/flags"
- "github.com/coroot/coroot-node-agent/ebpftracer"
- "github.com/coroot/coroot-node-agent/ebpftracer/l7"
- "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
- "github.com/coroot/coroot-node-agent/proc"
- "github.com/coroot/coroot-node-agent/tracing"
- "github.com/coroot/coroot-node-agent/utils"
- . "github.com/coroot/coroot-node-agent/utils/modelse"
- "github.com/pkg/errors"
- klog "github.com/sirupsen/logrus"
- semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
- "inet.af/netaddr"
- )
- func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
- trace, ok := c.traceMap[traceId]
- return trace, ok
- }
- func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
- c.traceMap[traceId] = trace
- }
- // 查询或创建trace信息
- func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
- trace, ok := c.getTrace(traceId)
- if !ok {
- //new trace
- trace = tracing.NewTraceFromEvent(string(c.id))
- //create TraceMap
- c.createTraceMap(traceId, trace)
- //create ParentSpan
- trace.CreateRootSpan(traceId)
- }
- return trace, nil
- }
- // getGrpcServerNetworkInfo 获取 gRPC server 的网络信息
- // 返回: IP地址, 端口号, 容器ID
- func (c *Container) getGrpcServerNetworkInfo() (string, uint16, string) {
- containerID := ""
- if c.cgroup != nil {
- containerID = c.cgroup.ContainerId
- }
- ipAddr := ""
- ifaces, err := net.Interfaces()
- if err == nil {
- for _, iface := range ifaces {
- if iface.Name == "eth0" {
- addrs, err := iface.Addrs()
- if err == nil {
- for _, addr := range addrs {
- var ipnet *net.IPNet
- switch v := addr.(type) {
- case *net.IPNet:
- ipnet = v
- case *net.IPAddr:
- ipnet = &net.IPNet{IP: v.IP, Mask: v.IP.DefaultMask()}
- }
- if ipnet != nil && ipnet.IP.To4() != nil {
- ipAddr = ipnet.IP.String()
- break
- }
- }
- }
- break
- }
- }
- }
- klog.Debugf("grpc server ip %s", ipAddr)
- // 本地端口尝试从AppInfo.Sport获取
- port := c.AppInfo.Sport
- klog.Debugf("grpc server port %d", port)
- return ipAddr, uint16(port), containerID
- }
- // Deprecated: InitTrace not used
- //func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
- // method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
- // ip, err := netaddr.ParseIP(hostIp)
- // if err != nil {
- // //fmt.Println("host ip error")
- // hostIp = "127.0.0.1"
- // }
- // addr := netaddr.IPPortFrom(ip, port)
- // trace := tracing.NewTrace(string(c.id), addr)
- // if trace == nil {
- // return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
- // }
- // c.traceMap[traceId] = trace
- //
- // trace.TraceStart(method, path, r.Status, r.Duration)
- // return nil
- //}
- // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
- // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
- func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
- if t.AllEventReady(traceID) {
- t.SendEvent()
- klog.Debugf("SendEvent %d", traceID)
- //fmt.Println(t.GetSpan())
- //fmt.Println("===============")
- delete(c.traceMap, traceID)
- }
- }
- func (c *Container) valuableTrace(traceID uint64) bool {
- return traceID != 0
- }
- func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
- c.lock.Lock()
- defer c.lock.Unlock()
- if r.Protocol == l7.ProtocolDNS {
- ip2fqdn, _type, fqdn, ttl, ips := c.onDNSRequest(r)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- if err == nil {
- apmTrace.DNSTraceQueryEvent(r, _type, fqdn, ttl, ips)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- return ip2fqdn
- }
- //if !c.valuableTrace(r.TraceId) {
- // return nil
- //}
- //klog.Infof("====ProtocolTrace+++++ start==== %d %d", pid, r.TraceId)
- // klog.Infof("====ProtocolTrace===== start==== %d %d", r.Protocol == l7.ProtocolTrace, c.l7Attach)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- // klog.Infof("====ProtocolTrace---- start==== %d %d", pid, r.TraceId)
- if r.TraceStart == TRACE_STATUS {
- // klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
- trace, err := c.getOrInitTrace(r.TraceId)
- if c.AppInfo.AppName != "" {
- klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
- }
- if err == nil {
- switch r.Protocol {
- case l7.ProtocolHTTP:
- method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
- // userAgent 可以在这里使用,例如传递给 trace.TraceStartEvent
- ip, _ := netaddr.ParseIP(sn)
- //codeType := c.GetCodeTypeFromCache(pid)
- container_id := ""
- if c.cgroup != nil {
- container_id = c.cgroup.ContainerId
- }
- trace.TraceStartEvent(method, requestURI, sn, userAgent, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
- c.SendEvent(trace, r.TraceId)
- case l7.ProtocolGrpc:
- // gRPC
- ipAddr, port, containerID := c.getGrpcServerNetworkInfo()
- trace.GrpcServerTraceStartEvent(ipAddr, port, r, c.GetAppInfo(), containerID)
- c.SendEvent(trace, r.TraceId)
- case l7.ProtocolKafka:
- var sn string
- var sport uint16
- var container_id string
- conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
- if conn != nil {
- sn = conn.ActualDest.IP().String()
- sport = conn.ActualDest.Port()
- }
- if c.cgroup != nil {
- container_id = c.cgroup.ContainerId
- }
- // MQ
- trace.MQConsumerTraceStartEvent(sn, sport, r, c.GetAppInfo(), container_id)
- c.SendEvent(trace, r.TraceId)
- }
- }
- return nil
- }
- if r.TraceEnd == TRACE_STATUS {
- klog.Debugf("====ProtocolTrace end==== %d %d", pid, r.TraceId)
- trace, err := c.getOrInitTrace(r.TraceId)
- if err == nil {
- trace.TraceEndEvent(r)
- c.SendEvent(trace, r.TraceId)
- }
- return nil
- }
- }
- /**
- * HTTP
- */
- if r.Protocol == l7.ProtocolHTTP {
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- //fmt.Println("ProtocolHTTP-----", r.TraceId, err)
- if err == nil {
- apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- //return nil
- }
- /**
- * gRPC
- */
- if r.Protocol == l7.ProtocolGrpc {
- klog.Infoln("conn == nil r.Protocol == l7.ProtocolGrpc")
- klog.Infoln("enter the l7.ProtocolGrpc")
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- if err == nil {
- apmTrace.GrpcClientTraceQueryEvent(r)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- }
- conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
- //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
- if conn == nil {
- conn = &ActiveConnection{
- Dest: r.ComponentDAddr,
- ActualDest: r.ComponentDAddr,
- Timestamp: timestamp,
- }
- //return nil
- }
- if timestamp != 0 && conn.Timestamp != timestamp {
- //if r.Protocol == l7.ProtocolGrpc {
- // klog.Infoln("timestamp != 0 && conn.Timestamp != timestamp r.Protocol == l7.ProtocolGrpc")
- // klog.Infoln("enter the l7.ProtocolGrpc")
- // if c.l7Attach && c.valuableTrace(r.TraceId) {
- // apmTrace, err := c.getOrInitTrace(r.TraceId)
- // if err == nil {
- // apmTrace.GrpcClientTraceQueryEvent(r)
- // c.SendEvent(apmTrace, r.TraceId)
- // }
- // }
- //}
- return nil
- }
- stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
- //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
- switch r.Protocol {
- /**
- * HTTP
- */
- case l7.ProtocolHTTP:
- if c.AppInfo.AppName != "" {
- klog.Debugf("[%s] ->>>>> curl -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, r.Payload)
- }
- stats.observe(r.Status.Http(), "", r.Duration)
- /**
- * HTTP2
- */
- case l7.ProtocolHTTP2:
- if conn.http2Parser == nil {
- conn.http2Parser = l7.NewHttp2Parser()
- }
- requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
- for _, req := range requests {
- stats.observe(req.Status.Http(), "", req.Duration)
- //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
- }
- /**
- * PostgreSQL
- */
- case l7.ProtocolPostgres:
- if r.Method != l7.MethodStatementClose {
- stats.observe(r.Status.String(), "", r.Duration)
- }
- //if conn.postgresParser == nil {
- // conn.postgresParser = l7.NewPostgresParser()
- //}
- //query := conn.postgresParser.Parse(r.Payload)
- //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- if conn.postgresParser == nil {
- conn.postgresParser = l7.NewPostgresParser()
- }
- query := conn.postgresParser.Parse(r.Payload)
- //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
- if c.AppInfo.AppName != "" {
- klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
- }
- //apmTrace, ok := c.getTrace(r.TraceId)
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- //fmt.Println("mysql r.TraceId:", r.TraceId)
- //fmt.Println("ok:", ok)
- //fmt.Println("traceMap:", len(c.traceMap))
- if err == nil {
- //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
- //apmTrace.PostGreSqlTraceQueryEvent(query, r, conn.ActualDest)
- apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemPostgreSQL, query, r, conn.ActualDest)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- /**
- * Mysql
- */
- case l7.ProtocolMysql:
- if r.Method != l7.MethodStatementClose {
- stats.observe(r.Status.String(), "", r.Duration)
- }
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- if conn.mysqlParser == nil {
- conn.mysqlParser = l7.NewMysqlParser()
- }
- query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
- //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
- if c.AppInfo.AppName != "" {
- klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
- }
- //apmTrace, ok := c.getTrace(r.TraceId)
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- //fmt.Println("mysql r.TraceId:", r.TraceId)
- //fmt.Println("ok:", ok)
- //fmt.Println("traceMap:", len(c.traceMap))
- if err == nil {
- dbSystem := semconv.DBSystemMySQL
- // 根据端口白名单确定协议类型
- l7Type := flags.GetProtocolByPort(uint16(conn.ActualDest.Port()))
- if l7Type == l7.ProtocolMariaDB {
- dbSystem = semconv.DBSystemMariaDB
- }
- //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
- //apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
- apmTrace.SQLTraceQueryEvent(l7Type, dbSystem, query, r, conn.ActualDest)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- /**
- * DM (达梦数据库)
- */
- case l7.ProtocolDM:
- //统计dm的query次数
- stats.observe(r.Status.String(), "", r.Duration)
- //是否发送数据
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- if conn.dmParser == nil {
- conn.dmParser = l7.NewDmParser()
- }
- query := conn.dmParser.Parse(r.Payload, r.StatementId)
- if c.AppInfo.AppName != "" {
- klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
- }
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- if err == nil {
- //apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
- apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemDaMengDB, query, r, conn.ActualDest)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- /**
- * Memcached
- */
- case l7.ProtocolMemcached:
- stats.observe(r.Status.String(), "", r.Duration)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- cmd, items := l7.ParseMemcached(r.Payload)
- if c.AppInfo.AppName != "" {
- klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd+" "+strings.Join(items, " "))
- }
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- if err == nil {
- statement := cmd
- if len(items) == 1 {
- statement += " " + items[0]
- } else if len(items) > 1 {
- joined := fmt.Sprintf("[%s]", strings.Join(items, " "))
- statement += " " + joined
- }
- apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMemcached, cmd, statement, r, conn.Src, conn.ActualDest)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- /**
- * Redis
- */
- case l7.ProtocolRedis:
- stats.observe(r.Status.String(), "", r.Duration)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- cmd, args := l7.ParseRedis(r.Payload)
- //fmt.Println("cmd", cmd)
- //fmt.Println("args", args)
- //apmTrace, ok := c.getTrace(r.TraceId)
- if c.AppInfo.AppName != "" {
- klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd)
- }
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- if err == nil {
- statement := cmd
- if args != "" {
- statement += " " + args
- }
- apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemRedis, cmd, statement, r, conn.Src, conn.ActualDest)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
- /**
- * gRPC
- */
- case l7.ProtocolGrpc:
- klog.Debugln("enter the l7.ProtocolGrpc")
- stats.observe(r.Status.String(), "", r.Duration)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- if err == nil {
- apmTrace.GrpcClientTraceQueryEvent(r)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- /**
- * MongoDB
- */
- case l7.ProtocolMongo:
- stats.observe(r.Status.String(), "", r.Duration)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- query := l7.ParseMongo(r.Payload)
- if c.AppInfo.AppName != "" {
- klog.Debugf("[%s] ->>>>> MongoDB -> %s SQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
- }
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- if err == nil {
- // MongoDB query 格式通常是 JSON,如 {"insert":"users"} 或 {"find":"users","filter":{...}}
- apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMongoDB, "", query, r, conn.Src, conn.ActualDest)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- /**
- * Cassandra
- */
- case l7.ProtocolCassandra:
- stats.observe(r.Status.String(), "", r.Duration)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- if conn.cassandraParser == nil {
- conn.cassandraParser = l7.NewCassandraParser()
- }
- var query string
- query = string(r.Payload)
- //query := conn.cassandraParser.Parse(r.Payload)
- if c.AppInfo.AppName != "" {
- klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
- }
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- if err == nil {
- apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemCassandra, "", query, r, conn.Src, conn.ActualDest)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- /**
- * Kafka
- */
- case l7.ProtocolKafka:
- stats.observe(r.Status.String(), "", r.Duration)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- if c.AppInfo.AppName != "" {
- klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, r.DestAddrString)
- }
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- if err == nil {
- apmTrace.MQTraceQueryEvent(r.Protocol, semconv.MessagingKafkaClientID("kafka"), "", "", r, conn.Src, conn.ActualDest)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- /**
- * RabbitMQ / NATS
- */
- case l7.ProtocolRabbitmq, l7.ProtocolNats:
- stats.observe(r.Status.String(), r.Method.String(), 0)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- }
- /**
- * Dubbo2
- */
- case l7.ProtocolDubbo2:
- stats.observe(r.Status.String(), "", r.Duration)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- }
- }
- return nil
- }
- func (c *Container) buildIDs(pid uint32) bool {
- c.lock.Lock()
- defer c.lock.Unlock()
- p := c.processes[pid]
- if p != nil {
- p.cmdline = string(proc.GetRealCmdline(pid))
- }
- var sns []string
- var sport uint16
- for address, val := range c.getListens() {
- if val == 1 {
- ip := address.IP()
- if ip.Is4() && !ip.IsLoopback() {
- // 获取端口号
- sport = address.Port()
- sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
- ////c.instanceID.IntVal, c.instanceID.HashtVal, _ =
- //c.AppInfo.Sn = ip.String()
- //c.AppInfo.Sport = int(port)
- //strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
- //fmt.Println(port)
- ////os.Exit(1)
- //c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
- //c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
- ////c.AppInfo.InstanceId = c.instanceID.IntVal
- //strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
- //c.AppInfo.AgentId, _ = strAgentID.ToInt64()
- //c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
- //return true
- }
- }
- }
- if len(sns) > 0 {
- //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
- snsStr := strings.Join(sns, ",")
- c.AppInfo.Sn = snsStr
- c.AppInfo.Sport = int(sport)
- strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
- c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
- c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
- // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
- // c.AppInfo.AgentId, _ = strAgentID.ToInt64()
- // c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
- return true
- }
- return false
- }
- func (c *Container) ReBuildIds(pid uint32) {
- c.lock.Lock()
- defer c.lock.Unlock()
- var sns []string
- var sport uint16
- for address, val := range c.getListens() {
- if val == 1 {
- ip := address.IP()
- if ip.Is4() && !ip.IsLoopback() {
- // 获取端口号
- sport = address.Port()
- sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
- }
- }
- }
- if len(sns) > 0 {
- snsStr := strings.Join(sns, ",")
- c.AppInfo.Sn = snsStr
- c.AppInfo.Sport = int(sport)
- strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
- c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
- c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
- // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
- strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", utils.GetHostIP(), c.AppInfo.AppIdHash.IntVal))
- c.AppInfo.AgentId, _ = strAgentID.ToInt64()
- c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
- }
- }
- func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
- c.lock.Lock()
- defer c.lock.Unlock()
- // get the associated uprobe
- uprobe, err := c.GetUprobe(event, tracer)
- if err != nil {
- //fmt.Println("GetUprobeGetUprobe errer: %v", err)
- klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
- return
- }
- if event.TraceId <= 0 {
- //fmt.Println("StackProcess TraceId id 0")
- klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
- return
- }
- // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
- stackFun := ebpftracer.StackFunEvent{}
- stackFun.Uprobe = &uprobe
- stackFun.StackEvent = event
- apmTrace, ok := c.getTrace(event.TraceId)
- if ok {
- apmTrace.FunAdd(stackFun)
- }
- }
- func byteExtractString(nameString [100]byte) string {
- n := bytes.IndexFunc(nameString[:], func(r rune) bool {
- return r == 0 || r < 32 || r > 126 // 截取到第一个零值或非打印字符
- })
- if n == -1 {
- n = len(nameString) // 没找到零值或非打印字符,使用数组长度
- }
- return string(nameString[:n])
- }
- func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
- c.lock.Lock()
- defer c.lock.Unlock()
- // get the associated uprobe
- switch event.Location {
- case 0: // ret
- Funcname := ""
- if event.Type != uint64(CodeTypeJava) {
- uprobe, err := c.GetUprobe(event, tracer)
- if err != nil {
- //fmt.Println("GetUprobeGetUprobe errer: %v", err)
- klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
- return
- }
- Funcname = uprobe.Funcname
- } else {
- ClassName := byteExtractString(event.ClassName)
- MethedName := byteExtractString(event.MethedName)
- Funcname = ClassName + "." + MethedName
- }
- if event.TraceId <= 0 {
- //fmt.Println("StackProcess TraceId id 0")
- klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
- return
- }
- //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
- apmTrace, err := c.getOrInitTrace(event.TraceId)
- if err == nil {
- //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
- duration := event.TimeNsEnd - event.TimeNsStart
- apmTrace.FuncTraceQuery(Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
- c.SendEvent(apmTrace, event.TraceId)
- }
- }
- }
- // ResolveAddress returns the symbol(s) and offset of the given address.
- func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
- if addr == 0 {
- // err = errors.Wrapf(SymbolNotFoundError, "0")
- return
- }
- // symbols, _, err := e.Symbols()
- if err != nil {
- return
- }
- idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
- if idx == 0 {
- // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
- return
- }
- // why diff symbol may contains the same addr?
- sym := symbols[idx-1]
- for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
- syms = append(syms, symbols[i])
- }
- for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
- syms = append(syms, symbols[i])
- }
- return syms, uint(addr - sym.Value), nil
- }
- type MemoryMap struct {
- Start, End uint64
- }
- // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
- func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
- file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
- if err != nil {
- return nil, err
- }
- defer file.Close()
- scanner := bufio.NewScanner(file)
- if scanner.Scan() {
- fields := strings.Fields(scanner.Text())
- addresses := strings.Split(fields[0], "-")
- if len(addresses) != 2 {
- return nil, errors.New("unexpected format in /proc/<pid>/maps")
- }
- start, err := strconv.ParseUint(addresses[0], 16, 64)
- if err != nil {
- return nil, err
- }
- end, err := strconv.ParseUint(addresses[1], 16, 64)
- if err != nil {
- return nil, err
- }
- return &MemoryMap{
- Start: start,
- End: end,
- }, nil
- }
- if err := scanner.Err(); err != nil {
- return nil, err
- }
- return nil, errors.New("empty /proc/<pid>/maps")
- }
- func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
- //fmt.Println("GetUprobe entory:")
- memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
- Address := event.Ip - memoryMap.Start
- // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
- for _, fun := range c.UprobesMap {
- funAddress := fun.Address + fun.AbsOffset
- // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
- if funAddress == Address {
- // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
- return fun, nil
- }
- }
- syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
- if err != nil {
- return
- }
- for _, sym := range syms {
- //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
- uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
- if ok {
- return uprobe, nil
- }
- }
- err = errors.New("uprobe not found")
- return
- }
- func (c *Container) GetAppInfo() AppInfo {
- return c.AppInfo
- }
- // 可注入前置
- func (c *Container) checkEventReady() bool {
- c.lock.Lock()
- defer c.lock.Unlock()
- return c.l7EventReady
- }
- func (c *Container) eventReady() {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.l7EventReady = true
- }
- // uprobe前置
- func (c *Container) Isl7AttachSuccess() bool {
- c.lock.Lock()
- defer c.lock.Unlock()
- return c.l7Attach
- }
- func (c *Container) l7AttachSuccess() {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.l7Attach = true
- }
- func (c *Container) ctrlStack(r *Registry, pid uint32) {
- resp, err := c.GetCodeSetting(r)
- if err != nil {
- klog.WithField("pid", pid).WithError(err).Error("[ctrlStack] GetCodeSetting failed.")
- return
- }
- if resp.BlackWhiteSettings.CollectStack == OPEN_STACK {
- // 有黑白名单规则 &&
- // 之前有注入 先卸载再注入
- // 之前没注入 直接注入
- // 没有有黑白名单 直接卸载
- if c.hasStackRule(resp) {
- if c.stackRuleUpdate(resp) {
- // 重新注入
- err = c.DetachStack(pid, APP_UNINSTALL)
- if err != nil {
- klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
- }
- }
- klog.WithField("pid", pid).Infoln("[ctrlStack] Attach app stack.")
- c.saveWhiteStackSettingInfo(resp)
- err = c.AttachStack(r.tracer, pid)
- if err != nil {
- c.AppInfo.SetAppStackError()
- klog.WithField("pid", pid).WithError(err).Errorf("[ctrlStack][end] Failed attach stack trace!")
- }
- } else {
- if c.noOrigRule() {
- return
- }
- c.saveWhiteStackSettingInfo(resp)
- // 关闭堆栈
- err = c.DetachStack(pid, APP_UNINSTALL)
- if err != nil {
- klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
- }
- }
- } else {
- if c.noOrigRule() {
- return
- }
- c.saveWhiteStackSettingInfo(resp)
- // 关闭堆栈
- err = c.DetachStack(pid, APP_UNINSTALL)
- if err != nil {
- klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
- }
- }
- }
- func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int) {
- p := c.processes[pid]
- if p != nil && c.checkEventReady() {
- codeType := c.GetCodeTypeFromCache(pid)
- if codeType.IsUnknownCode() {
- klog.WithField("pid", pid).Debug("[verify] unknown language.")
- return false, 0
- }
- cmdline := p.GetCmdline()
- if len(cmdline) == 0 {
- return false, 0
- }
- //whiteListByCode := r.getWhiteListByCodeType(codeType)
- whiteListByCode := r.getWhiteListAll()
- //klog.WithField("pid", pid).WithField("codeType", codeType.String()).
- // Infof("[verify] white list %v", utils.ToString(whiteListByCode))
- // 当前语言的白名单规则
- for _, setting := range whiteListByCode {
- ruleVal := setting.Filters
- if ruleVal == "" {
- continue
- }
- // 判断规则
- if strings.Contains(cmdline, ruleVal) {
- //if !codeType.IsJvmCode() {
- // klog.WithField("pid", pid).Warning("[verify] This agent version only supports JVM applications.")
- // return false, 0
- //}
- c.WhiteSettingInfo.AppName = setting.AppName
- c.WhiteSettingInfo.Filters = setting.Filters
- klog.WithField("pid", pid).
- WithField("codeType", codeType.String()).
- WithField("ruleVal", ruleVal).
- WithField("cmdline", cmdline).
- //WithField("stack", setting.OpenStack).
- WithField("white list", utils.ToString(whiteListByCode)).
- Infoln("[verify] check successful.")
- return true, 0
- }
- }
- }
- return false, 0
- }
- // 1.卸载入口
- func (c *Container) Detach(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) {
- c.lock.Lock()
- defer c.lock.Unlock()
- if p := c.processes[pid]; p != nil {
- err := c.DetachUprobes(tracer, pid, detachType)
- if err != nil {
- klog.WithError(err).Errorln("DetachUprobes Error.")
- }
- err = c.DetachStack(pid, detachType)
- if err != nil {
- klog.WithError(err).Errorln("DetachStack Error.")
- }
- // 关闭7层监控
- c.l7Attach = false
- // 变更应用状态
- if err != nil {
- detachType = detachType.Error()
- }
- c.AppInfo.SetAppStatus(detachType)
- }
- }
- // 1.1卸载uprobe
- func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) error {
- // close uprobe
- if p := c.processes[pid]; p != nil {
- for _, u := range p.uprobes {
- err := u.Close()
- if err != nil {
- return err
- }
- }
- p.uprobes = []link.Link{}
- switch detachType {
- case APP_UNINSTALL, APP_FUSE:
- codeType := c.GetCodeTypeFromCache(pid)
- switch codeType {
- case CodeTypeJava:
- p.jvmAttachOnce = false
- case CodeTypeGo:
- p.goTlsUprobesChecked = false
- p.openSslUprobesChecked = false
- default:
- }
- case APP_UPROBE_ERROR:
- klog.Infof("[DetachUprobes] ERROR_DETACH for pid %d", pid)
- default:
- }
- //delete the proc info form proc_info_map(for kernel) when the uprobe detached
- if err := tracer.DelKProcInfo(pid); err != nil {
- return fmt.Errorf("[DetachUprobes] failed to delete KProcInfo for pid %d, detach type is:%s", pid, detachType)
- } else {
- klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%d", pid, detachType)
- c.AppInfo.EBPFProcInfo = nil
- }
- } else {
- return fmt.Errorf("[DetachUprobes] cannot find uprobe for pid %d", pid)
- }
- return nil
- }
- // 1.2卸载堆栈
- func (c *Container) DetachStack(pid uint32, detachType APP_TYPE) error {
- if p := c.processes[pid]; p != nil {
- var err error
- codeType := c.GetCodeTypeFromCache(pid)
- switch codeType {
- // 1.2.1 卸载 jvm堆栈
- case CodeTypeJava:
- err = c.detachJvmStack(pid)
- default:
- err = p.closeStackUprobes()
- }
- if err != nil {
- klog.WithError(err).Errorln("[detachStack] failed to detach stack")
- return err
- }
- p.stackAttachOnce = false
- } else {
- return fmt.Errorf("[DetachStack] cannot find uprobe for pid %d", pid)
- }
- return nil
- }
- // 1.2.1 卸载 jvm堆栈
- func (c *Container) detachJvmStack(pid uint32) error {
- if p := c.processes[pid]; p != nil {
- //if p.stackStatus.IsStackUprobesSuccess() || len(p.stackUprobes) > 0 {
- //}
- // 卸载 JavaAgent
- var err error
- if p.stackStatus.IsJattachSuccess() {
- // 卸载堆栈probes
- err = p.closeStackUprobes()
- if err != nil {
- klog.WithError(err).Errorf("[detachJvmStack] closeStackUprobes")
- }
- err = p.uninstallJavaAgent()
- if err != nil {
- klog.WithError(err).Errorf("[detachJvmStack] uninstallJavaAgent")
- }
- }
- return err
- }
- return nil
- }
- func (c *Container) getRootfs() string {
- if c.metadata != nil && c.metadata.rootfs != "" {
- return path.Join(*flags.HostDirPathPrefix, c.metadata.rootfs)
- }
- return ""
- }
- func (c *Container) BuildActiveApps(runtimeApps map[uint32]AppStatusInfo, pid uint32) {
- if c == nil {
- //klog.WithField("pid", pid).Warningln("[BuildActiveApps] container_apm is nil.")
- return
- }
- if c.AppInfo.AppName == "" {
- return
- }
- klog.WithField("pid", pid).WithField("appname", c.AppInfo.AppName).Infof("[BuildActiveApps] container %s is running.", c.AppInfo.AppName)
- detail := AppStatusInfo{
- Pid: pid,
- ProcName: c.containerName,
- AppName: c.AppInfo.AppName,
- Language: c.AppInfo.CodeType.String(),
- LanguageVersion: c.AppInfo.Version,
- AppID: c.AppInfo.AppIdHash.IntVal,
- AgentID: c.AppInfo.AgentId,
- InstanceID: c.AppInfo.InstanceIdHash.IntVal,
- Sn: c.AppInfo.Sn,
- Sport: c.AppInfo.Sport,
- RegisterAt: time.Unix(c.AppInfo.RegisterAt, 0).Format("060102 15:04:05"),
- PreStatus: c.AppInfo.PreStatus,
- Status: c.AppInfo.Status,
- Rule: c.WhiteSettingInfo.Filters,
- Container: string(c.id),
- }
- detail.Rule = fmt.Sprintf("%s|%d", c.WhiteSettingInfo.Filters, c.WhiteSettingInfo.WhiteStackSettingInfo.OpenStack)
- if c.AppInfo.UpdateAt != 0 {
- detail.UpdateAt = time.Unix(c.AppInfo.UpdateAt, 0).Format("060102 15:04:05")
- }
- p := c.processes[pid]
- if p != nil {
- detail.StackStatus = p.stackStatus.String()
- v := 0
- if !p.versionFailed {
- v = 1
- }
- detail.StackStatus += fmt.Sprintf("V=%d", v)
- }
- runtimeApps[pid] = detail
- }
- func (c *Container) AgentCtrl(r *Registry, pid uint32) {
- if c == nil {
- //klog.WithField("pid", pid).Warningln("[AgentCtrl] cannot find container.")
- return
- }
- var err error
- verifyAttachConditions, _ := c.verifyAttachConditions(r, pid)
- // fusing UNINSTALL
- if r.isFusing && c.Isl7AttachSuccess() {
- c.Detach(r.tracer, pid, APP_FUSE)
- klog.WithField("pid", pid).Infoln("[AgentCtrl] fusing")
- return
- }
- // verify UNINSTALL
- if !verifyAttachConditions && c.Isl7AttachSuccess() {
- c.Detach(r.tracer, pid, APP_UNINSTALL)
- klog.WithField("pid", pid).Infoln("[AgentCtrl] rule uninstall.")
- return
- }
- if verifyAttachConditions {
- err = c.RegisterAppInfo(r, pid)
- if err != nil {
- klog.WithError(err).Errorf("[AgentCtrl] Failed registerAppInfo.")
- return
- }
- klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes.")
- err = c.AttachUprobes(r.tracer, pid, "Agentctrl")
- if err != nil {
- klog.WithField("pid", pid).WithError(err).Errorf("[AgentCtrl] Failed attach uprobes error!")
- return
- } else {
- klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes success!")
- }
- // 堆栈控制
- c.ctrlStack(r, pid)
- }
- }
|