浏览代码

Merge pull request #108 from coroot/python_gil_monitoring

add `container_python_thread_lock_wait_time_seconds` metric
Nikolay Sivko 1 年之前
父节点
当前提交
163e1f1a32
共有 9 个文件被更改,包括 219 次插入35 次删除
  1. 9 3
      containers/container.go
  2. 8 2
      containers/metrics.go
  3. 26 6
      containers/process.go
  4. 5 1
      containers/registry.go
  5. 0 0
      ebpftracer/ebpf.go
  6. 11 9
      ebpftracer/ebpf/ebpf.c
  7. 42 0
      ebpftracer/ebpf/python.c
  8. 84 0
      ebpftracer/python.go
  9. 34 14
      ebpftracer/tracer.go

+ 9 - 3
containers/container.go

@@ -125,7 +125,8 @@ type Container struct {
 	l7Stats  L7Stats
 	dnsStats *L7Metrics
 
-	oomKills int
+	oomKills                 int
+	pythonThreadLockWaitTime time.Duration
 
 	mounts map[string]proc.MountInfo
 
@@ -352,6 +353,10 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 	for appType := range appTypes {
 		ch <- gauge(metrics.ApplicationType, 1, appType)
 	}
+	if c.pythonThreadLockWaitTime > 0 {
+		ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
+	}
+
 	if c.dnsStats.Requests != nil {
 		c.dnsStats.Requests.Collect(ch)
 	}
@@ -367,7 +372,7 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 	}
 }
 
-func (c *Container) onProcessStart(pid uint32) *Process {
+func (c *Container) onProcessStart(pid uint32, trace *ebpftracer.Tracer) *Process {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	stats, err := TaskstatsPID(pid)
@@ -375,7 +380,8 @@ func (c *Container) onProcessStart(pid uint32) *Process {
 		return nil
 	}
 	c.zombieAt = time.Time{}
-	p := NewProcess(pid, stats)
+	p := NewProcess(pid, stats, trace)
+
 	if p == nil {
 		return nil
 	}

+ 8 - 2
containers/metrics.go

@@ -46,7 +46,10 @@ var metrics = struct {
 	JvmGCTime            *prometheus.Desc
 	JvmSafepointTime     *prometheus.Desc
 	JvmSafepointSyncTime *prometheus.Desc
-	Ip2Fqdn              *prometheus.Desc
+
+	PythonThreadLockWaitTime *prometheus.Desc
+
+	Ip2Fqdn *prometheus.Desc
 }{
 	ContainerInfo: metric("container_info", "Meta information about the container", "image", "systemd_triggered_by"),
 
@@ -89,7 +92,10 @@ var metrics = struct {
 	JvmGCTime:            metric("container_jvm_gc_time_seconds", "Time spent in the given JVM garbage collector in seconds", "jvm", "gc"),
 	JvmSafepointTime:     metric("container_jvm_safepoint_time_seconds", "Time the application has been stopped for safepoint operations in seconds", "jvm"),
 	JvmSafepointSyncTime: metric("container_jvm_safepoint_sync_time_seconds", "Time spent getting to safepoints in seconds", "jvm"),
-	Ip2Fqdn:              metric("ip_to_fqdn", "Mapping IP addresses to FQDNs based on DNS requests initiated by containers", "ip", "fqdn"),
+
+	Ip2Fqdn: metric("ip_to_fqdn", "Mapping IP addresses to FQDNs based on DNS requests initiated by containers", "ip", "fqdn"),
+
+	PythonThreadLockWaitTime: metric("container_python_thread_lock_wait_time_seconds", "Time spent waiting acquiring GIL in seconds"),
 }
 
 var (

+ 26 - 6
containers/process.go

@@ -1,14 +1,15 @@
 package containers
 
 import (
+	"bytes"
 	"context"
 	"os"
 	"time"
 
-	"github.com/jpillora/backoff"
-
 	"github.com/cilium/ebpf/link"
+	"github.com/coroot/coroot-node-agent/ebpftracer"
 	"github.com/coroot/coroot-node-agent/proc"
+	"github.com/jpillora/backoff"
 	"github.com/mdlayher/taskstats"
 )
 
@@ -27,12 +28,13 @@ type Process struct {
 	uprobes               []link.Link
 	goTlsUprobesChecked   bool
 	openSslUprobesChecked bool
+	pythonGilChecked      bool
 }
 
-func NewProcess(pid uint32, stats *taskstats.Stats) *Process {
+func NewProcess(pid uint32, stats *taskstats.Stats, tracer *ebpftracer.Tracer) *Process {
 	p := &Process{Pid: pid, StartedAt: stats.BeginTime}
 	p.ctx, p.cancelFunc = context.WithCancel(context.Background())
-	go p.instrument()
+	go p.instrument(tracer)
 	return p
 }
 
@@ -52,7 +54,7 @@ func (p *Process) isHostNs() bool {
 	return p.NetNsId() == hostNetNsId
 }
 
-func (p *Process) instrument() {
+func (p *Process) instrument(tracer *ebpftracer.Tracer) {
 	b := backoff.Backoff{Factor: 2, Min: time.Second, Max: time.Minute}
 	for {
 		select {
@@ -64,18 +66,36 @@ func (p *Process) instrument() {
 				return
 			}
 			if dest != "/" {
+				p.instrumentPython(tracer)
 				if dotNetAppName, err := dotNetApp(p.Pid); err == nil {
 					if dotNetAppName != "" {
 						p.dotNetMonitor = NewDotNetMonitor(p.ctx, p.Pid, dotNetAppName)
 					}
-					return
 				}
+				return
 			}
 			time.Sleep(b.Duration())
 		}
 	}
 }
 
+func (p *Process) instrumentPython(tracer *ebpftracer.Tracer) {
+	if p.pythonGilChecked {
+		return
+	}
+	p.pythonGilChecked = true
+	cmdline := proc.GetCmdline(p.Pid)
+	if len(cmdline) == 0 {
+		return
+	}
+	parts := bytes.Split(cmdline, []byte{0})
+	cmd := bytes.TrimSuffix(bytes.Fields(parts[0])[0], []byte{':'})
+	if !pythonCmd.Match(cmd) {
+		return
+	}
+	p.uprobes = append(p.uprobes, tracer.AttachPythonThreadLockProbes(p.Pid)...)
+}
+
 func (p *Process) Close() {
 	p.cancelFunc()
 	for _, u := range p.uprobes {

+ 5 - 1
containers/registry.go

@@ -206,7 +206,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 					}
 				}
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
-					p := c.onProcessStart(e.Pid)
+					p := c.onProcessStart(e.Pid, r.tracer)
 					if r.processInfoCh != nil && p != nil {
 						r.processInfoCh <- ProcessInfo{Pid: p.Pid, ContainerId: c.id, StartedAt: p.StartedAt}
 					}
@@ -272,6 +272,10 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 					}
 					r.ip2fqdnLock.Unlock()
 				}
+			case ebpftracer.EventTypePythonThreadLock:
+				if c := r.containersByPid[e.Pid]; c != nil {
+					c.pythonThreadLockWaitTime += e.Duration
+				}
 			}
 		}
 	}

文件差异内容过多而无法显示
+ 0 - 0
ebpftracer/ebpf.go


+ 11 - 9
ebpftracer/ebpf/ebpf.c

@@ -5,15 +5,16 @@
 #include <bpf/bpf_tracing.h>
 #include <bpf/bpf_endian.h>
 
-#define EVENT_TYPE_PROCESS_START	1
-#define EVENT_TYPE_PROCESS_EXIT		2
-#define EVENT_TYPE_CONNECTION_OPEN	3
-#define EVENT_TYPE_CONNECTION_CLOSE	4
-#define EVENT_TYPE_CONNECTION_ERROR	5
-#define EVENT_TYPE_LISTEN_OPEN		6
-#define EVENT_TYPE_LISTEN_CLOSE 	7
-#define EVENT_TYPE_FILE_OPEN		8
-#define EVENT_TYPE_TCP_RETRANSMIT	9
+#define EVENT_TYPE_PROCESS_START	    1
+#define EVENT_TYPE_PROCESS_EXIT		    2
+#define EVENT_TYPE_CONNECTION_OPEN	    3
+#define EVENT_TYPE_CONNECTION_CLOSE	    4
+#define EVENT_TYPE_CONNECTION_ERROR	    5
+#define EVENT_TYPE_LISTEN_OPEN		    6
+#define EVENT_TYPE_LISTEN_CLOSE 	    7
+#define EVENT_TYPE_FILE_OPEN		    8
+#define EVENT_TYPE_TCP_RETRANSMIT	    9
+#define EVENT_TYPE_PYTHON_THREAD_LOCK	11
 
 #define EVENT_REASON_OOM_KILL		1
 
@@ -39,5 +40,6 @@
 #include "l7/l7.c"
 #include "l7/gotls.c"
 #include "l7/openssl.c"
+#include "python.c"
 
 char _license[] SEC("license") = "GPL";

+ 42 - 0
ebpftracer/ebpf/python.c

@@ -0,0 +1,42 @@
+struct {
+    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
+    __uint(key_size, sizeof(int));
+    __uint(value_size, sizeof(int));
+} python_thread_events SEC(".maps");
+
+struct {
+    __uint(type, BPF_MAP_TYPE_HASH);
+    __uint(key_size, sizeof(__u64));
+    __uint(value_size, sizeof(__u64));
+    __uint(max_entries, 10240);
+} python_thread_locks SEC(".maps");
+
+SEC("uprobe/pthread_cond_timedwait_enter")
+int pthread_cond_timedwait_enter(struct pt_regs *ctx) {
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+    __u64 timestamp = bpf_ktime_get_ns();
+    bpf_map_update_elem(&python_thread_locks, &pid_tgid, &timestamp, BPF_ANY);
+    return 0;
+}
+
+struct python_thread_event {
+    __u32 type;
+    __u32 pid;
+    __u64 duration;
+};
+
+SEC("uprobe/pthread_cond_timedwait_exit")
+int pthread_cond_timedwait_exit(struct pt_regs *ctx) {
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+    __u64 *timestamp = bpf_map_lookup_elem(&python_thread_locks, &pid_tgid);
+    if (!timestamp) {
+        return 0;
+    }
+    struct python_thread_event e = {
+        .type = EVENT_TYPE_PYTHON_THREAD_LOCK,
+        .pid = pid_tgid >> 32,
+        .duration = bpf_ktime_get_ns()-*timestamp,
+    };
+    bpf_perf_event_output(ctx, &python_thread_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
+    return 0;
+}

+ 84 - 0
ebpftracer/python.go

@@ -0,0 +1,84 @@
+package ebpftracer
+
+import (
+	"bufio"
+	"os"
+	"regexp"
+	"strings"
+
+	"github.com/cilium/ebpf/link"
+	"github.com/coroot/coroot-node-agent/proc"
+	"k8s.io/klog/v2"
+)
+
+var (
+	libcRegexp = regexp.MustCompile(`libc[\.-]`)
+	muslRegexp = regexp.MustCompile(`musl[\.-]`)
+)
+
+func (t *Tracer) AttachPythonThreadLockProbes(pid uint32) []link.Link {
+	exePath := getPthreadLib(pid)
+	if exePath == "" {
+		return nil
+	}
+
+	log := func(msg string, err error) {
+		if err != nil {
+			for _, s := range []string{"no such file or directory", "no such process", "permission denied"} {
+				if strings.HasSuffix(err.Error(), s) {
+					return
+				}
+			}
+			klog.ErrorfDepth(1, "pid=%d lib=%s: %s: %s", pid, exePath, msg, err)
+			return
+		}
+		klog.InfofDepth(1, "pid=%d lib=%s: %s", pid, exePath, msg)
+	}
+	exe, err := link.OpenExecutable(exePath)
+	if err != nil {
+		log("failed to open executable", err)
+		return nil
+	}
+	var links []link.Link
+	uprobe, err := exe.Uprobe("pthread_cond_timedwait", t.uprobes["pthread_cond_timedwait_enter"], nil)
+	if err != nil {
+		log("failed to attach uprobe", err)
+		return nil
+	}
+	links = append(links, uprobe)
+	uretprobe, err := exe.Uretprobe("pthread_cond_timedwait", t.uprobes["pthread_cond_timedwait_exit"], nil)
+	if err != nil {
+		log("failed to attach uretprobe", err)
+		return nil
+	}
+	links = append(links, uretprobe)
+	log("python uprobes attached", nil)
+	return links
+}
+
+func getPthreadLib(pid uint32) string {
+	f, err := os.Open(proc.Path(pid, "maps"))
+	if err != nil {
+		return ""
+	}
+	defer f.Close()
+	scanner := bufio.NewScanner(f)
+	scanner.Split(bufio.ScanLines)
+	libc := ""
+	for scanner.Scan() {
+		parts := strings.Fields(scanner.Text())
+		if len(parts) <= 5 {
+			continue
+		}
+		libPath := parts[5]
+		switch {
+		case libcRegexp.MatchString(libPath):
+			libc = proc.Path(pid, "root", libPath)
+		case muslRegexp.MatchString(libPath):
+			return proc.Path(pid, "root", libPath)
+		case strings.Contains(libPath, "libpthread"):
+			return proc.Path(pid, "root", libPath)
+		}
+	}
+	return libc
+}

+ 34 - 14
ebpftracer/tracer.go

@@ -29,16 +29,17 @@ type EventType uint32
 type EventReason uint32
 
 const (
-	EventTypeProcessStart    EventType = 1
-	EventTypeProcessExit     EventType = 2
-	EventTypeConnectionOpen  EventType = 3
-	EventTypeConnectionClose EventType = 4
-	EventTypeConnectionError EventType = 5
-	EventTypeListenOpen      EventType = 6
-	EventTypeListenClose     EventType = 7
-	EventTypeFileOpen        EventType = 8
-	EventTypeTCPRetransmit   EventType = 9
-	EventTypeL7Request       EventType = 10
+	EventTypeProcessStart     EventType = 1
+	EventTypeProcessExit      EventType = 2
+	EventTypeConnectionOpen   EventType = 3
+	EventTypeConnectionClose  EventType = 4
+	EventTypeConnectionError  EventType = 5
+	EventTypeListenOpen       EventType = 6
+	EventTypeListenClose      EventType = 7
+	EventTypeFileOpen         EventType = 8
+	EventTypeTCPRetransmit    EventType = 9
+	EventTypeL7Request        EventType = 10
+	EventTypePythonThreadLock EventType = 11
 
 	EventReasonNone    EventReason = 0
 	EventReasonOOMKill EventReason = 1
@@ -59,10 +60,11 @@ type Event struct {
 type perfMapType uint8
 
 const (
-	perfMapTypeProcEvents perfMapType = 1
-	perfMapTypeTCPEvents  perfMapType = 2
-	perfMapTypeFileEvents perfMapType = 3
-	perfMapTypeL7Events   perfMapType = 4
+	perfMapTypeProcEvents         perfMapType = 1
+	perfMapTypeTCPEvents          perfMapType = 2
+	perfMapTypeFileEvents         perfMapType = 3
+	perfMapTypeL7Events           perfMapType = 4
+	perfMapTypePythonThreadEvents perfMapType = 5
 )
 
 type Tracer struct {
@@ -201,6 +203,7 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 		{name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
 		{name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
 		{name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
+		{name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4},
 	}
 
 	if !t.disableL7Tracing {
@@ -324,6 +327,12 @@ type l7Event struct {
 	PayloadSize         uint64
 }
 
+type pythonThreadEvent struct {
+	Type     EventType
+	Pid      uint32
+	Duration uint64
+}
+
 func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
 	for {
 		rec, err := r.Read()
@@ -392,6 +401,17 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				Timestamp: v.Timestamp,
 				Duration:  time.Duration(v.Duration),
 			}
+		case perfMapTypePythonThreadEvents:
+			v := &pythonThreadEvent{}
+			if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
+				klog.Warningln("failed to read msg:", err)
+				continue
+			}
+			event = Event{
+				Type:     v.Type,
+				Pid:      v.Pid,
+				Duration: time.Duration(v.Duration),
+			}
 		default:
 			continue
 		}

部分文件因为文件数量过多而无法显示