| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- package worker
- import (
- "bytes"
- "crypto/tls"
- "crypto/x509"
- "errors"
- "fmt"
- //"github.com/cloudwise/agentdaemon/src/util"
- //"github.com/cloudwise/agentdaemon/src/util/conf/dynamicLoad/config"
- log "github.com/sirupsen/logrus"
- "io"
- "io/ioutil"
- "net"
- "net/http"
- "net/url"
- "os"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- )
- var gProxyClient *Client
- const CheckHealthy = 10 * time.Second
- const FailCount = 3
- type Client struct {
- Endpoints []*Endpoint
- Current uint32
- Mutex sync.RWMutex
- }
- type Endpoint struct {
- Url string
- FailCount uint32
- LastFail time.Time
- Available atomic.Value
- Mutex sync.RWMutex
- }
- func (e *Endpoint) Failed() {
- e.Mutex.Lock()
- defer e.Mutex.Unlock()
- atomic.AddUint32(&e.FailCount, 1)
- e.LastFail = time.Now()
- if e.FailCount >= FailCount {
- e.Available.Store(false)
- }
- }
- func (e *Endpoint) Succeeded() {
- e.Mutex.Lock()
- defer e.Mutex.Unlock()
- atomic.StoreUint32(&e.FailCount, 0)
- e.Available.Store(true)
- }
- func (c *Client) GetStatus() bool {
- failAllEndpoint := false
- for _, v := range c.Endpoints {
- if v.Available.Load().(bool) {
- failAllEndpoint = true
- }
- }
- return failAllEndpoint
- }
- func (c *Client) DoWithNextEndpoint(method string, uri string, data []byte, header map[string]string, timeOut time.Duration) (*http.Response, error, bool) {
- c.Mutex.Lock()
- defer c.Mutex.Unlock()
- var (
- endpoint *Endpoint
- index uint32
- skipNum uint32
- noPoints bool
- )
- endpointsNum := len(c.Endpoints)
- if !c.GetStatus() {
- noPoints = true
- return nil, errors.New("all endpoints are unavailable."), noPoints
- }
- for i := uint32(0); i < uint32(endpointsNum); i++ {
- index = (atomic.LoadUint32(&c.Current) + i) % uint32(endpointsNum)
- if c.Endpoints[index].Available.Load().(bool) {
- endpoint = c.Endpoints[index]
- break
- }
- skipNum++
- }
- proxyUrl := endpoint.Url + uri
- log.WithField("module", "proxy").Debugf("proxy is <%s>", proxyUrl)
- req, err := http.NewRequest(method, proxyUrl, bytes.NewBuffer(data))
- if header != nil {
- contentType, ok := header["Content-Type"]
- if ok {
- req.Header.Set("Content-Type", contentType)
- delete(header, "Content-Type")
- }
- for k, v := range header {
- req.Header.Add(k, v)
- }
- }
- if err != nil {
- return nil, err, noPoints
- }
- // 加载自签名证书
- tlsConfig := &tls.Config{InsecureSkipVerify: true}
- //certFile := util.GetDefaultPath(util.MetaPath, "cert.pem")
- certFile := ""
- _, pemErr := os.Stat(certFile)
- if pemErr == nil {
- cert, err := ioutil.ReadFile(certFile)
- if err != nil {
- log.Fatal(err)
- }
- certPool := x509.NewCertPool()
- certPool.AppendCertsFromPEM(cert)
- // 配置TLS客户端
- tlsConfig = &tls.Config{RootCAs: certPool}
- }
- client := &http.Client{
- Timeout: timeOut,
- Transport: &http.Transport{
- Dial: func(netw, addr string) (net.Conn, error) {
- c, err := net.DialTimeout(netw, addr, time.Second*time.Duration(3))
- if err != nil {
- return nil, err
- }
- return c, nil
- },
- ResponseHeaderTimeout: time.Second * time.Duration(10),
- MaxIdleConnsPerHost: 20,
- IdleConnTimeout: time.Second * time.Duration(10),
- TLSClientConfig: tlsConfig,
- },
- }
- resp, clientErr := client.Do(req)
- log.Infof("[endpoint] %s%s body <%s>", endpoint.Url, uri, string(data))
- if clientErr != nil {
- endpoint.Failed()
- return nil, errors.New(fmt.Sprintf("[endpoint] %s url %s body %s client err is %v", endpoint.Url, uri, string(data), clientErr)), noPoints
- }
- if resp.StatusCode >= 400 {
- endpoint.Failed()
- return resp, fmt.Errorf("[endpoint] returned StatusCode=%d.", resp.StatusCode), noPoints
- }
- endpoint.Succeeded()
- atomic.AddUint32(&c.Current, 1+skipNum)
- return resp, nil, noPoints
- }
- func (c *Client) CheckEndpoints() {
- ticker := time.NewTicker(CheckHealthy)
- defer ticker.Stop()
- for range ticker.C {
- c.Mutex.Lock()
- for _, endpoint := range c.Endpoints {
- endpoint.Mutex.Lock()
- if !endpoint.Available.Load().(bool) && time.Since(endpoint.LastFail) > CheckHealthy {
- endpoint.Available.Store(true)
- endpoint.FailCount = 0
- }
- endpoint.Mutex.Unlock()
- }
- c.Mutex.Unlock()
- }
- }
- func (c *Client) SetEndpoint(endPointStr string, ssl bool) error {
- if endPointStr == "" {
- return fmt.Errorf("endpoints can not empty.")
- }
- c.Mutex.Lock()
- defer c.Mutex.Unlock()
- serverList := strings.Split(endPointStr, ",")
- var endpoints []*Endpoint
- var available atomic.Value
- phonetic := "http"
- if ssl {
- phonetic = "https"
- }
- available.Store(true)
- for _, server := range serverList {
- if !strings.HasPrefix(server, phonetic) {
- server = fmt.Sprintf("%s://%s", phonetic, server)
- }
- log.WithField("module", "proxy").Infof("proxy server %s", server)
- endpoints = append(endpoints, &Endpoint{
- Url: server,
- FailCount: 0,
- LastFail: time.Time{},
- Available: available,
- Mutex: sync.RWMutex{},
- })
- }
- c.Endpoints = endpoints
- c.Current = 0
- return nil
- }
- func (c *Client) ProxyRequest2(method string, uri string, data []byte, header map[string]string) ([]byte, error) {
- u, err := url.Parse(uri)
- if err != nil {
- log.WithField("module", "proxy").WithError(err).Errorf("url Parse error")
- return nil, err
- }
- uri = u.RequestURI()
- for {
- resp, pointErr, noPoints := c.DoWithNextEndpoint(method, uri, data, header, time.Duration(60)*time.Second)
- if noPoints {
- log.WithField("module", "proxy").WithError(pointErr).Errorf("proxy has no healthy points")
- return nil, pointErr
- }
- if pointErr != nil {
- log.WithField("module", "proxy").WithError(pointErr).Errorf("proxy resp faild")
- time.Sleep(100 * time.Millisecond)
- continue
- }
- defer func() {
- if resp != nil {
- resp.Body.Close()
- }
- }()
- body, bodyError := ioutil.ReadAll(resp.Body)
- if bodyError != nil {
- log.WithField("module", "proxy").WithError(bodyError).Errorf("proxy ReadAll Body faild")
- time.Sleep(100 * time.Millisecond)
- continue
- }
- log.WithField("module", "proxy").Debugf("response: is %s", body)
- return body, pointErr
- }
- }
- func (c *Client) ProxyRequest(method string, uri string, data []byte, header map[string]string) ([]byte, error) {
- u, err := url.Parse(uri)
- if err != nil {
- log.WithField("module", "proxy").WithError(err).Errorf("url Parse error")
- return nil, err
- }
- uri = u.RequestURI()
- for {
- resp, pointErr, noPoints := c.DoWithNextEndpoint(method, uri, data, header, time.Duration(60)*time.Second)
- if noPoints {
- log.WithError(pointErr).
- Errorf("[proxy] has no healthy points")
- return nil, pointErr
- }
- var body []byte
- var bodyError error
- if resp != nil {
- defer func() {
- resp.Body.Close()
- }()
- body, bodyError = io.ReadAll(resp.Body)
- if bodyError != nil {
- log.WithError(bodyError).
- WithField("request", string(data)).
- WithField("uri", uri).
- Errorf("[proxy] readAll Body faild")
- time.Sleep(100 * time.Millisecond)
- continue
- }
- }
- if pointErr != nil {
- log.WithField("request", string(data)).
- WithField("response", string(body)).
- WithField("uri", uri).
- WithError(pointErr).
- Errorf("[proxy] resp faild")
- time.Sleep(100 * time.Millisecond)
- continue
- }
- log.Debugf("[proxy] response: is %s", body)
- return body, pointErr
- }
- }
- func (c *Client) ProxyRespRequest(method string, uri string, data []byte, header map[string]string) (*http.Response, error) {
- u, err := url.Parse(uri)
- if err != nil {
- return nil, err
- }
- uri = u.RequestURI()
- for {
- resp, err, noPoints := c.DoWithNextEndpoint(method, uri, data, header, time.Duration(600)*time.Second)
- if noPoints {
- log.WithField("module", "proxy").WithError(err).Errorf("proxy has no healthy points")
- return nil, err
- }
- if err != nil {
- log.WithField("module", "proxy").WithError(err).Errorf("proxy resp faild")
- time.Sleep(100 * time.Millisecond)
- continue
- }
- return resp, err
- }
- }
- func NewProxyClient(endPointStr string, ssl bool) (*Client, error) {
- if gProxyClient != nil {
- return gProxyClient, nil
- }
- var err error
- client := &Client{Mutex: sync.RWMutex{}}
- err = client.SetEndpoint(endPointStr, ssl)
- gProxyClient = client
- return client, err
- }
- func GetProxyClient() (*Client, error) {
- if gProxyClient != nil {
- return gProxyClient, nil
- } else {
- return nil, fmt.Errorf("ProxyClient is nil")
- }
- }
- func DownloadPackage(url, path string) error {
- http.DefaultClient.Timeout = time.Duration(600) * time.Second
- //htmlData, err := http.Get(url)
- proxyClient, _ := GetProxyClient()
- htmlData, err := proxyClient.ProxyRespRequest(http.MethodGet, url, nil, nil)
- if err != nil {
- log.WithError(err).Errorf("There are some network errors.")
- return err
- }
- defer htmlData.Body.Close()
- if htmlData == nil {
- return errors.New("Install agent failed,No data was obtained from the URL,URL is" + url)
- }
- file, err := os.Create(path)
- if err != nil {
- log.Errorf("download package create path[%s] error:%s", path, err)
- return err
- }
- defer file.Close()
- buf := make([]byte, 1024*5)
- totalSize := 0
- totalSizeBak := 0
- lastTime := time.Now()
- for {
- size, err := htmlData.Body.Read(buf)
- if err != nil && err != io.EOF {
- log.WithError(err).Errorf("There were some errors writing to the file during download.")
- return err
- }
- if size == 0 {
- break
- }
- file.Write(buf[:size])
- totalSize += size
- if totalSize-totalSizeBak > 5*1024*1024 {
- totalSizeBak = totalSize
- log.Infof("the current download size is %d MB", totalSize/1024/1024)
- elapsedTime := time.Since(lastTime)
- expectedTime := time.Second
- if expectedTime > elapsedTime {
- sleepTime := expectedTime - elapsedTime
- if sleepTime > 0 {
- time.Sleep(sleepTime)
- }
- }
- lastTime = time.Now()
- }
- }
- log.Infof("the current download size is %d MB", totalSize/1024/1024)
- return nil
- }
|