Răsfoiți Sursa

Feature #TASK_QT-21112 【私有发版v8.5】Euspace 应用层的go协程捕获panic堆栈

Tom 1 an în urmă
părinte
comite
f0b4cced07

+ 4 - 1
containers/dotnet.go

@@ -6,6 +6,8 @@ import (
 	"debug/elf"
 	"errors"
 	"fmt"
+	. "github.com/coroot/coroot-node-agent/utils"
+	"github.com/coroot/coroot-node-agent/utils/try"
 	"math"
 	"net"
 	"path/filepath"
@@ -88,7 +90,8 @@ func NewDotNetMonitor(ctx context.Context, pid uint32, appName string) *DotNetMo
 		threadPoolQueueLength:         newGauge("container_dotnet_thread_pool_queue_length", "The number of work items that are currently queued to be processed in the ThreadPool", constLabels),
 		threadPoolThreadsCount:        newGauge("container_dotnet_thread_pool_size", "The number of thread pool threads that currently exist in the ThreadPool", constLabels),
 	}
-	go m.run(ctx)
+	//go m.run(ctx)
+	try.GoParams(m.run, CatchFn, ctx)
 	return m
 }
 

+ 3 - 1
containers/registry.go

@@ -175,7 +175,9 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, nodeInfo *Node
 	//	return nil, err
 	//}
 	try.Go(r.PullAllAppInfo, CatchFn)
-	go r.handleEvents(r.events)
+	//go r.handleEvents(r.events)
+	try.GoParams(r.handleEvents, CatchFn, r.events)
+
 	if err = r.tracer.Run(r.events); err != nil {
 		close(r.events)
 		return nil, err

+ 4 - 1
ebpftracer/tracer.go

@@ -7,6 +7,8 @@ import (
 	"encoding/hex"
 	"errors"
 	"fmt"
+	"github.com/coroot/coroot-node-agent/utils"
+	"github.com/coroot/coroot-node-agent/utils/try"
 	"os"
 	"strconv"
 	"strings"
@@ -334,7 +336,8 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 			}
 			t.readers[pm.name] = r
 			// event监听
-			go runEventsReader(pm.name, r, ch, pm.typ)
+			//go runEventsReader(pm.name, r, ch, pm.typ)
+			try.GoParams(runEventsReader, utils.CatchFn, pm.name, r, ch, pm.typ)
 		}
 	}
 	klog.Infof("[end] Look eBPF perf_maps")

+ 6 - 3
ebpftracer/tracer/offset.go

@@ -6,7 +6,9 @@ import (
 	"errors"
 	"fmt"
 	"github.com/cilium/ebpf"
+	"github.com/coroot/coroot-node-agent/utils"
 	. "github.com/coroot/coroot-node-agent/utils/modelse"
+	"github.com/coroot/coroot-node-agent/utils/try"
 	klog "github.com/sirupsen/logrus"
 	"io"
 	"net"
@@ -39,16 +41,17 @@ func kernelOffsetInferServer(listener net.Listener) error {
 
 	//cpuOnlineCount := runtime.NumCPU()
 
-	go func() {
+	try.Go(func() {
 		for {
 			conn, err := listener.Accept()
 			if err != nil {
 				klog.Errorf("[eBPF Kernel Adapt] Fail to accept client request: %v", err)
 				return
 			}
-			go handleConnection(conn)
+			//go handleConnection(conn)
+			try.GoParams(handleConnection, utils.CatchFn, conn)
 		}
-	}()
+	}, utils.CatchFn)
 
 	return nil
 }

+ 4 - 1
kube/client.go

@@ -4,6 +4,8 @@ import (
 	"context"
 	"fmt"
 	"github.com/coroot/coroot-node-agent/kube/transfer"
+	"github.com/coroot/coroot-node-agent/utils"
+	"github.com/coroot/coroot-node-agent/utils/try"
 	"github.com/sirupsen/logrus"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
@@ -36,7 +38,8 @@ func NewKubeClient() (*transfer.CwK8sClient, error) {
 		return nil, err
 	}
 	cwk8sclient := transfer.NewCwK8sClient(ctx, clientset)
-	go cwk8sclient.InformerSharedfactory.Start(make(chan struct{}, 0))
+	//go cwk8sclient.InformerSharedfactory.Start(make(chan struct{}, 0))
+	try.GoParams(cwk8sclient.InformerSharedfactory.Start, utils.CatchFn, make(chan struct{}, 0))
 	CwK8sClient = cwk8sclient
 	logrus.Info("[kube] connect init success")
 	return cwk8sclient, nil

+ 4 - 1
logs/journald_reader.go

@@ -2,6 +2,8 @@ package logs
 
 import (
 	"fmt"
+	"github.com/coroot/coroot-node-agent/utils"
+	"github.com/coroot/coroot-node-agent/utils/try"
 	"strings"
 	"sync"
 	"time"
@@ -45,7 +47,8 @@ func NewJournaldReader(journalPaths ...string) (*JournaldReader, error) {
 	if r.journal == nil {
 		return nil, fmt.Errorf("systemd journal not found in %s", strings.Join(journalPaths, ","))
 	}
-	go r.follow()
+	//go r.follow()
+	try.Go(r.follow, utils.CatchFn)
 	return r, nil
 }
 

+ 4 - 2
logs/tail_reader.go

@@ -3,6 +3,8 @@ package logs
 import (
 	"bufio"
 	"context"
+	"github.com/coroot/coroot-node-agent/utils"
+	"github.com/coroot/coroot-node-agent/utils/try"
 	"io"
 	"os"
 	"strings"
@@ -48,7 +50,7 @@ func NewTailReader(fileName string, ch chan<- logparser.LogEntry) (*TailReader,
 	}
 	r.reader = bufio.NewReader(r.file)
 
-	go func() {
+	try.Go(func() {
 		var prefix string
 		for {
 			select {
@@ -73,7 +75,7 @@ func NewTailReader(fileName string, ch chan<- logparser.LogEntry) (*TailReader,
 				}
 			}
 		}
-	}()
+	}, utils.CatchFn)
 
 	return r, nil
 }

+ 7 - 4
main.go

@@ -365,7 +365,8 @@ func main() {
 		log.Infoln("Response Body:", string(responseData))
 	}
 	sendNetDataDone := make(chan struct{})
-	go func() {
+
+	try.Go(func() {
 		sendNetDataTicker := time.NewTicker(sendNetDataInterval)
 		defer sendNetDataTicker.Stop()
 		for {
@@ -378,7 +379,7 @@ func main() {
 				}
 			}
 		}
-	}()
+	}, utils.CatchFn)
 
 	/*	metricsHandler := func(w http.ResponseWriter, r *http.Request) {
 		// 从注册表中获取指标数据
@@ -530,13 +531,15 @@ func main() {
 	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
 
 	done := make(chan struct{})
-	go func() {
+
+	try.Go(func() {
 		<-sigs
 		log.Infoln("Signal received, shutting down...")
 		cr.Close()
 		close(sendNetDataDone)
 		close(done)
-	}()
+	}, utils.CatchFn)
+
 	select {
 	case <-done:
 		log.Infoln(flags.AgentName + " exited successfully.")

+ 6 - 3
profiling/profiling.go

@@ -4,6 +4,8 @@ import (
 	"bytes"
 	"crypto/tls"
 	"fmt"
+	"github.com/coroot/coroot-node-agent/utils"
+	"github.com/coroot/coroot-node-agent/utils/try"
 	"hash/fnv"
 	"io"
 	"net/http"
@@ -105,7 +107,8 @@ func Init(hostId, hostName string) chan<- containers.ProcessInfo {
 		session = nil
 		return nil
 	}
-	go collect()
+	//go collect()
+	try.Go(collect, utils.CatchFn)
 
 	processInfoCh := make(chan containers.ProcessInfo)
 	targetFinder.start(processInfoCh)
@@ -207,7 +210,7 @@ type TargetFinder struct {
 }
 
 func (tf *TargetFinder) start(processInfoCh <-chan containers.ProcessInfo) {
-	go func() {
+	try.Go(func() {
 		for pi := range processInfoCh {
 			tf.lock.Lock()
 			tf.processes[pi.Pid] = &processInfo{
@@ -216,7 +219,7 @@ func (tf *TargetFinder) start(processInfoCh <-chan containers.ProcessInfo) {
 			}
 			tf.lock.Unlock()
 		}
-	}()
+	}, utils.CatchFn)
 }
 
 func (tf *TargetFinder) get(pid uint32) *processInfo {