package worker import ( "bytes" "crypto/tls" "crypto/x509" "errors" "fmt" //"github.com/cloudwise/agentdaemon/src/util" //"github.com/cloudwise/agentdaemon/src/util/conf/dynamicLoad/config" log "github.com/sirupsen/logrus" "io" "io/ioutil" "net" "net/http" "net/url" "os" "strings" "sync" "sync/atomic" "time" ) var gProxyClient *Client const CheckHealthy = 10 * time.Second const FailCount = 3 type Client struct { Endpoints []*Endpoint Current uint32 Mutex sync.RWMutex } type Endpoint struct { Url string FailCount uint32 LastFail time.Time Available atomic.Value Mutex sync.RWMutex } func (e *Endpoint) Failed() { e.Mutex.Lock() defer e.Mutex.Unlock() atomic.AddUint32(&e.FailCount, 1) e.LastFail = time.Now() if e.FailCount >= FailCount { e.Available.Store(false) } } func (e *Endpoint) Succeeded() { e.Mutex.Lock() defer e.Mutex.Unlock() atomic.StoreUint32(&e.FailCount, 0) e.Available.Store(true) } func (c *Client) GetStatus() bool { failAllEndpoint := false for _, v := range c.Endpoints { if v.Available.Load().(bool) { failAllEndpoint = true } } return failAllEndpoint } func (c *Client) DoWithNextEndpoint(method string, uri string, data []byte, header map[string]string, timeOut time.Duration) (*http.Response, error, bool) { c.Mutex.Lock() defer c.Mutex.Unlock() var ( endpoint *Endpoint index uint32 skipNum uint32 noPoints bool ) endpointsNum := len(c.Endpoints) if !c.GetStatus() { noPoints = true return nil, errors.New("all endpoints are unavailable."), noPoints } for i := uint32(0); i < uint32(endpointsNum); i++ { index = (atomic.LoadUint32(&c.Current) + i) % uint32(endpointsNum) if c.Endpoints[index].Available.Load().(bool) { endpoint = c.Endpoints[index] break } skipNum++ } proxyUrl := endpoint.Url + uri log.WithField("module", "proxy").Debugf("proxy is <%s>", proxyUrl) req, err := http.NewRequest(method, proxyUrl, bytes.NewBuffer(data)) if header != nil { contentType, ok := header["Content-Type"] if ok { req.Header.Set("Content-Type", contentType) delete(header, "Content-Type") } for k, v := range header { req.Header.Add(k, v) } } if err != nil { return nil, err, noPoints } // 加载自签名证书 tlsConfig := &tls.Config{InsecureSkipVerify: true} //certFile := util.GetDefaultPath(util.MetaPath, "cert.pem") certFile := "" _, pemErr := os.Stat(certFile) if pemErr == nil { cert, err := ioutil.ReadFile(certFile) if err != nil { log.Fatal(err) } certPool := x509.NewCertPool() certPool.AppendCertsFromPEM(cert) // 配置TLS客户端 tlsConfig = &tls.Config{RootCAs: certPool} } client := &http.Client{ Timeout: timeOut, Transport: &http.Transport{ Dial: func(netw, addr string) (net.Conn, error) { c, err := net.DialTimeout(netw, addr, time.Second*time.Duration(3)) if err != nil { return nil, err } return c, nil }, ResponseHeaderTimeout: time.Second * time.Duration(10), MaxIdleConnsPerHost: 20, IdleConnTimeout: time.Second * time.Duration(10), TLSClientConfig: tlsConfig, }, } resp, clientErr := client.Do(req) log.Infof("[endpoint] %s%s body <%s>", endpoint.Url, uri, string(data)) if clientErr != nil { endpoint.Failed() return nil, errors.New(fmt.Sprintf("[endpoint] %s url %s body %s client err is %v", endpoint.Url, uri, string(data), clientErr)), noPoints } if resp.StatusCode >= 400 { endpoint.Failed() return resp, fmt.Errorf("[endpoint] returned StatusCode=%d.", resp.StatusCode), noPoints } endpoint.Succeeded() atomic.AddUint32(&c.Current, 1+skipNum) return resp, nil, noPoints } func (c *Client) CheckEndpoints() { ticker := time.NewTicker(CheckHealthy) defer ticker.Stop() for range ticker.C { c.Mutex.Lock() for _, endpoint := range c.Endpoints { endpoint.Mutex.Lock() if !endpoint.Available.Load().(bool) && time.Since(endpoint.LastFail) > CheckHealthy { endpoint.Available.Store(true) endpoint.FailCount = 0 } endpoint.Mutex.Unlock() } c.Mutex.Unlock() } } func (c *Client) SetEndpoint(endPointStr string, ssl bool) error { if endPointStr == "" { return fmt.Errorf("endpoints can not empty.") } c.Mutex.Lock() defer c.Mutex.Unlock() serverList := strings.Split(endPointStr, ",") var endpoints []*Endpoint var available atomic.Value phonetic := "http" if ssl { phonetic = "https" } available.Store(true) for _, server := range serverList { if !strings.HasPrefix(server, phonetic) { server = fmt.Sprintf("%s://%s", phonetic, server) } log.WithField("module", "proxy").Infof("proxy server %s", server) endpoints = append(endpoints, &Endpoint{ Url: server, FailCount: 0, LastFail: time.Time{}, Available: available, Mutex: sync.RWMutex{}, }) } c.Endpoints = endpoints c.Current = 0 return nil } func (c *Client) ProxyRequest2(method string, uri string, data []byte, header map[string]string) ([]byte, error) { u, err := url.Parse(uri) if err != nil { log.WithField("module", "proxy").WithError(err).Errorf("url Parse error") return nil, err } uri = u.RequestURI() for { resp, pointErr, noPoints := c.DoWithNextEndpoint(method, uri, data, header, time.Duration(60)*time.Second) if noPoints { log.WithField("module", "proxy").WithError(pointErr).Errorf("proxy has no healthy points") return nil, pointErr } if pointErr != nil { log.WithField("module", "proxy").WithError(pointErr).Errorf("proxy resp faild") time.Sleep(100 * time.Millisecond) continue } defer func() { if resp != nil { resp.Body.Close() } }() body, bodyError := ioutil.ReadAll(resp.Body) if bodyError != nil { log.WithField("module", "proxy").WithError(bodyError).Errorf("proxy ReadAll Body faild") time.Sleep(100 * time.Millisecond) continue } log.WithField("module", "proxy").Debugf("response: is %s", body) return body, pointErr } } func (c *Client) ProxyRequest(method string, uri string, data []byte, header map[string]string) ([]byte, error) { u, err := url.Parse(uri) if err != nil { log.WithField("module", "proxy").WithError(err).Errorf("url Parse error") return nil, err } uri = u.RequestURI() for { resp, pointErr, noPoints := c.DoWithNextEndpoint(method, uri, data, header, time.Duration(60)*time.Second) if noPoints { log.WithError(pointErr). Errorf("[proxy] has no healthy points") return nil, pointErr } var body []byte var bodyError error if resp != nil { defer func() { resp.Body.Close() }() body, bodyError = io.ReadAll(resp.Body) if bodyError != nil { log.WithError(bodyError). WithField("request", string(data)). WithField("uri", uri). Errorf("[proxy] readAll Body faild") time.Sleep(100 * time.Millisecond) continue } } if pointErr != nil { log.WithField("request", string(data)). WithField("response", string(body)). WithField("uri", uri). WithError(pointErr). Errorf("[proxy] resp faild") time.Sleep(100 * time.Millisecond) continue } log.Debugf("[proxy] response: is %s", body) return body, pointErr } } func (c *Client) ProxyRespRequest(method string, uri string, data []byte, header map[string]string) (*http.Response, error) { u, err := url.Parse(uri) if err != nil { return nil, err } uri = u.RequestURI() for { resp, err, noPoints := c.DoWithNextEndpoint(method, uri, data, header, time.Duration(600)*time.Second) if noPoints { log.WithField("module", "proxy").WithError(err).Errorf("proxy has no healthy points") return nil, err } if err != nil { log.WithField("module", "proxy").WithError(err).Errorf("proxy resp faild") time.Sleep(100 * time.Millisecond) continue } return resp, err } } func NewProxyClient(endPointStr string, ssl bool) (*Client, error) { if gProxyClient != nil { return gProxyClient, nil } var err error client := &Client{Mutex: sync.RWMutex{}} err = client.SetEndpoint(endPointStr, ssl) gProxyClient = client return client, err } func GetProxyClient() (*Client, error) { if gProxyClient != nil { return gProxyClient, nil } else { return nil, fmt.Errorf("ProxyClient is nil") } } func DownloadPackage(url, path string) error { http.DefaultClient.Timeout = time.Duration(600) * time.Second //htmlData, err := http.Get(url) proxyClient, _ := GetProxyClient() htmlData, err := proxyClient.ProxyRespRequest(http.MethodGet, url, nil, nil) if err != nil { log.WithError(err).Errorf("There are some network errors.") return err } defer htmlData.Body.Close() if htmlData == nil { return errors.New("Install agent failed,No data was obtained from the URL,URL is" + url) } file, err := os.Create(path) if err != nil { log.Errorf("download package create path[%s] error:%s", path, err) return err } defer file.Close() buf := make([]byte, 1024*5) totalSize := 0 totalSizeBak := 0 lastTime := time.Now() for { size, err := htmlData.Body.Read(buf) if err != nil && err != io.EOF { log.WithError(err).Errorf("There were some errors writing to the file during download.") return err } if size == 0 { break } file.Write(buf[:size]) totalSize += size if totalSize-totalSizeBak > 5*1024*1024 { totalSizeBak = totalSize log.Infof("the current download size is %d MB", totalSize/1024/1024) elapsedTime := time.Since(lastTime) expectedTime := time.Second if expectedTime > elapsedTime { sleepTime := expectedTime - elapsedTime if sleepTime > 0 { time.Sleep(sleepTime) } } lastTime = time.Now() } } log.Infof("the current download size is %d MB", totalSize/1024/1024) return nil }