|
|
@@ -8,7 +8,6 @@ import (
|
|
|
. "github.com/coroot/coroot-node-agent/utils/modelse"
|
|
|
"github.com/coroot/coroot-node-agent/utils/try"
|
|
|
. "github.com/coroot/coroot-node-agent/utils/worker"
|
|
|
- log "github.com/sirupsen/logrus"
|
|
|
"os"
|
|
|
"regexp"
|
|
|
"strconv"
|
|
|
@@ -138,15 +137,15 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh
|
|
|
if clientErr == nil {
|
|
|
// 负载健康检测
|
|
|
try.Go(proxyClient.CheckEndpoints, CatchFn)
|
|
|
- log.Infof("New Proxy Client success.config_server is [%s]", "")
|
|
|
+ klog.Infof("New Proxy Client success.config_server is [%s]", "")
|
|
|
} else {
|
|
|
- log.WithError(clientErr).Errorf("NewProxyClient error, Please check [export CONFIG_ENDPOINT=ip:port]")
|
|
|
+ klog.WithError(clientErr).Errorf("NewProxyClient error, Please check [export CONFIG_ENDPOINT=ip:port]")
|
|
|
return nil, clientErr
|
|
|
}
|
|
|
|
|
|
r.connServer, err = NewServerHTTPWorker()
|
|
|
if err != nil {
|
|
|
- log.Errorf("init connServer error:%s.", err)
|
|
|
+ klog.Errorf("init connServer error:%s.", err)
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
@@ -193,7 +192,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
|
|
|
case now := <-gcTicker.C:
|
|
|
_, err := r.getWhiteList()
|
|
|
if err != nil {
|
|
|
- log.WithError(err).Errorf("connWhiteList error")
|
|
|
+ klog.WithError(err).Errorf("connWhiteList error")
|
|
|
}
|
|
|
runtimeApps := make(map[uint32]AppStatusInfo)
|
|
|
for pid, c := range r.containersByPid {
|
|
|
@@ -368,7 +367,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
|
|
|
case ebpftracer.EventTypeConnectionOpen:
|
|
|
//fmt.Println("ebpftracer.EventTypeConnectionOpen==================", e.Pid)
|
|
|
if c := r.getOrCreateContainer(e.Pid); c != nil {
|
|
|
- c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false)
|
|
|
+ c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false, e.Duration)
|
|
|
c.eventReady()
|
|
|
if common.IsOpenFilter() && common.IsFilterPid(e.Pid) {
|
|
|
c.WhiteSettingInfo.AppName = enums.TestApp
|
|
|
@@ -390,24 +389,12 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
|
|
|
if c := r.containersByPid[e.Pid]; c != nil {
|
|
|
c.onListenClose(e.Pid, e.SrcAddr)
|
|
|
}
|
|
|
-
|
|
|
- case ebpftracer.EventTypeConnectionOpen:
|
|
|
- if c := r.getOrCreateContainer(e.Pid); c != nil {
|
|
|
- c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false, e.Duration)
|
|
|
- c.attachTlsUprobes(r.tracer, e.Pid)
|
|
|
- } else {
|
|
|
- klog.Infoln("TCP connection from unknown container", e)
|
|
|
- }
|
|
|
case ebpftracer.EventTypeConnectionError:
|
|
|
if c := r.getOrCreateContainer(e.Pid); c != nil {
|
|
|
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, 0, true, e.Duration)
|
|
|
} else {
|
|
|
klog.Infoln("TCP connection error from unknown container", e)
|
|
|
}
|
|
|
- case ebpftracer.EventTypeConnectionClose:
|
|
|
- if c := r.containersByPid[e.Pid]; c != nil {
|
|
|
- c.onConnectionClose(e)
|
|
|
- }
|
|
|
case ebpftracer.EventTypeTCPRetransmit:
|
|
|
srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
|
|
|
for _, c := range r.containersById {
|
|
|
@@ -448,105 +435,100 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
|
|
|
// fmt.Printf("e.EventTypeFunEnt ErrorError: TraceId:%d, Pid:%d, Location:%d, Goid:%d, TimeNs:%d, Ip:%X, CallerIp:%x, Bp:%x, CallerBp:%x", e.StackEvent.TraceId, e.StackEvent.Pid, e.StackEvent.Location, e.StackEvent.Goid, e.StackEvent.TimeNsStart, e.StackEvent.Ip, e.StackEvent.CallerIp, e.StackEvent.Bp, e.StackEvent.CallerBp)
|
|
|
// fmt.Printf("e.EventTypeFunEnt ErrorError: TraceId:%x, FPid:%x, Nid:%x, Level:%d\n", e.StackEvent.Fpid, e.StackEvent.Nid, e.StackEvent.Level)
|
|
|
}
|
|
|
- case ebpftracer.EventTypePythonThreadLock:
|
|
|
- if c := r.containersByPid[e.Pid]; c != nil {
|
|
|
- c.pythonThreadLockWaitTime += e.Duration
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// func (r *Registry) getOrCreateContainer(pid uint32) *Container {
|
|
|
-// if c, seen := r.containersByPid[pid]; c != nil {
|
|
|
-// return c
|
|
|
-// } else if seen { // ignored
|
|
|
-// return nil
|
|
|
-// }
|
|
|
-// cg, err := proc.ReadCgroup(pid)
|
|
|
-// if err != nil {
|
|
|
-// if !common.IsNotExist(err) {
|
|
|
-// klog.Warningln("failed to read proc cgroup:", err)
|
|
|
-// }
|
|
|
-// return nil
|
|
|
-// }
|
|
|
-// cgId := fmt.Sprintf("%s/%d", cg.Id, pid)
|
|
|
-// if c := r.containersByCgroupId[cgId]; c != nil {
|
|
|
-// r.containersByPid[pid] = c
|
|
|
-// return c
|
|
|
-// }
|
|
|
-// if cg.ContainerType == cgroup.ContainerTypeSandbox {
|
|
|
-// cmdline := proc.GetCmdline(pid)
|
|
|
-// parts := bytes.Split(cmdline, []byte{0})
|
|
|
-// if len(parts) > 0 {
|
|
|
-// cmd := parts[0]
|
|
|
-// lastArg := parts[len(parts)-1]
|
|
|
-// if (bytes.HasSuffix(cmd, []byte("runsc-sandbox")) || bytes.HasSuffix(cmd, []byte("runsc"))) && containerIdRegexp.Match(lastArg) {
|
|
|
-// cg.ContainerId = string(lastArg)
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-// md, err := getContainerMetadata(cg)
|
|
|
-// if err != nil {
|
|
|
-// klog.Warningf("failed to get container metadata for pid %d -> %s: %s", pid, cg.Id, err)
|
|
|
-// return nil
|
|
|
-// }
|
|
|
-// // add ns/workload/podname
|
|
|
-// id, extensionTag := calcId(cg, md, pid)
|
|
|
-
|
|
|
-// //klog.Infof("calculated container id %d -> %s -> %s", pid, cg.Id, id)
|
|
|
-// if id == "" {
|
|
|
-// if cg.Id == "/init.scope" && pid != 1 {
|
|
|
-// klog.Infoln("ignoring without persisting", "cg", cg.Id, "pid", pid)
|
|
|
-// } else {
|
|
|
-// klog.Infoln("ignoring", "cg", cg.Id, "pid", pid)
|
|
|
-// r.containersByPid[pid] = nil
|
|
|
-// }
|
|
|
-// return nil
|
|
|
-// }
|
|
|
-// if c := r.containersById[id]; c != nil {
|
|
|
-// //klog.Warningln("id conflict:", id)
|
|
|
-// if cg.CreatedAt().After(c.cgroup.CreatedAt()) {
|
|
|
-// c.cgroup = cg
|
|
|
-// c.metadata = md
|
|
|
-// c.runLogParser("")
|
|
|
-// if c.nsConntrack != nil {
|
|
|
-// _ = c.nsConntrack.Close()
|
|
|
-// c.nsConntrack = nil
|
|
|
-// }
|
|
|
-// }
|
|
|
-// setK8sTag(c, extensionTag, pid)
|
|
|
-// r.containersByPid[pid] = c
|
|
|
-// r.containersByCgroupId[cgId] = c
|
|
|
-// return c
|
|
|
-// }
|
|
|
-// c, err := NewContainer(id, cg, md, r.hostConntrack, pid, r)
|
|
|
-// if err != nil {
|
|
|
-// klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err)
|
|
|
-// return nil
|
|
|
-// }
|
|
|
-
|
|
|
-// //klog.Infoln("detected a new container", "pid", pid, "cg", cg.Id, "id", id)
|
|
|
-// // add ns/workload/podname/pid/ctype
|
|
|
-// //sType := fmt.Sprintf("%d", cg.ContainerType)
|
|
|
-
|
|
|
-// setK8sTag(c, extensionTag, pid)
|
|
|
-// if err := prometheus.WrapRegistererWith(setLabels(string(id),
|
|
|
-// extensionTag[Namespace],
|
|
|
-// extensionTag[PodName],
|
|
|
-// extensionTag[ProcessName],
|
|
|
-// fmt.Sprintf("%d", pid)), r.reg).Register(c); err != nil {
|
|
|
-// klog.Warningln("failed to register container:", err)
|
|
|
-// return nil
|
|
|
-// }
|
|
|
-// r.containersByPid[pid] = c
|
|
|
-// r.containersByCgroupId[cgId] = c
|
|
|
-// r.containersById[id] = c
|
|
|
-// return c
|
|
|
-// }
|
|
|
func (r *Registry) getOrCreateContainer(pid uint32) *Container {
|
|
|
- return nil
|
|
|
+ if c, seen := r.containersByPid[pid]; c != nil {
|
|
|
+ return c
|
|
|
+ } else if seen { // ignored
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ cg, err := proc.ReadCgroup(pid)
|
|
|
+ if err != nil {
|
|
|
+ if !common.IsNotExist(err) {
|
|
|
+ klog.Warningln("failed to read proc cgroup:", err)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ cgId := fmt.Sprintf("%s/%d", cg.Id, pid)
|
|
|
+ if c := r.containersByCgroupId[cgId]; c != nil {
|
|
|
+ r.containersByPid[pid] = c
|
|
|
+ return c
|
|
|
+ }
|
|
|
+ if cg.ContainerType == cgroup.ContainerTypeSandbox {
|
|
|
+ cmdline := proc.GetCmdline(pid)
|
|
|
+ parts := bytes.Split(cmdline, []byte{0})
|
|
|
+ if len(parts) > 0 {
|
|
|
+ cmd := parts[0]
|
|
|
+ lastArg := parts[len(parts)-1]
|
|
|
+ if (bytes.HasSuffix(cmd, []byte("runsc-sandbox")) || bytes.HasSuffix(cmd, []byte("runsc"))) && containerIdRegexp.Match(lastArg) {
|
|
|
+ cg.ContainerId = string(lastArg)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ md, err := getContainerMetadata(cg)
|
|
|
+ if err != nil {
|
|
|
+ klog.Warningf("failed to get container metadata for pid %d -> %s: %s", pid, cg.Id, err)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ // add ns/workload/podname
|
|
|
+ id, extensionTag := calcId(cg, md, pid)
|
|
|
+
|
|
|
+ //klog.Infof("calculated container id %d -> %s -> %s", pid, cg.Id, id)
|
|
|
+ if id == "" {
|
|
|
+ if cg.Id == "/init.scope" && pid != 1 {
|
|
|
+ klog.Infoln("ignoring without persisting", "cg", cg.Id, "pid", pid)
|
|
|
+ } else {
|
|
|
+ klog.Infoln("ignoring", "cg", cg.Id, "pid", pid)
|
|
|
+ r.containersByPid[pid] = nil
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ if c := r.containersById[id]; c != nil {
|
|
|
+ //klog.Warningln("id conflict:", id)
|
|
|
+ if cg.CreatedAt().After(c.cgroup.CreatedAt()) {
|
|
|
+ c.cgroup = cg
|
|
|
+ c.metadata = md
|
|
|
+ c.runLogParser("")
|
|
|
+ if c.nsConntrack != nil {
|
|
|
+ _ = c.nsConntrack.Close()
|
|
|
+ c.nsConntrack = nil
|
|
|
+ }
|
|
|
+ }
|
|
|
+ setK8sTag(c, extensionTag, pid)
|
|
|
+ r.containersByPid[pid] = c
|
|
|
+ r.containersByCgroupId[cgId] = c
|
|
|
+ return c
|
|
|
+ }
|
|
|
+ c, err := NewContainer(id, cg, md, r.hostConntrack, pid, r)
|
|
|
+ if err != nil {
|
|
|
+ klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ //klog.Infoln("detected a new container", "pid", pid, "cg", cg.Id, "id", id)
|
|
|
+ // add ns/workload/podname/pid/ctype
|
|
|
+ //sType := fmt.Sprintf("%d", cg.ContainerType)
|
|
|
+
|
|
|
+ setK8sTag(c, extensionTag, pid)
|
|
|
+ if err := prometheus.WrapRegistererWith(setLabels(string(id),
|
|
|
+ extensionTag[Namespace],
|
|
|
+ extensionTag[PodName],
|
|
|
+ extensionTag[ProcessName],
|
|
|
+ fmt.Sprintf("%d", pid)), r.reg).Register(c); err != nil {
|
|
|
+ klog.Warningln("failed to register container:", err)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ r.containersByPid[pid] = c
|
|
|
+ r.containersByCgroupId[cgId] = c
|
|
|
+ r.containersById[id] = c
|
|
|
+ return c
|
|
|
}
|
|
|
|
|
|
+
|
|
|
func (r *Registry) updateTrafficStatsIfNecessary() {
|
|
|
r.trafficStatsLock.Lock()
|
|
|
defer r.trafficStatsLock.Unlock()
|
|
|
@@ -635,7 +617,7 @@ func calcId(cg *cgroup.Cgroup, md *ContainerMetadata, pid uint32) (ContainerID,
|
|
|
namespace := md.env["NOMAD_NAMESPACE"]
|
|
|
task := md.env["NOMAD_TASK_NAME"]
|
|
|
if allocId != "" && group != "" && job != "" && namespace != "" && task != "" {
|
|
|
- return ContainerID(fmt.Sprintf("/nomad/%s/%s/%s/%s/%s", namespace, job, group, allocId, task))
|
|
|
+ return ContainerID(fmt.Sprintf("/nomad/%s/%s/%s/%s/%s", namespace, job, group, allocId, task)), extensionTag
|
|
|
}
|
|
|
}
|
|
|
if md.name == "" { // should be "pure" dockerd container here
|