proxySender.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. package worker
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "errors"
  7. "fmt"
  8. //"github.com/cloudwise/agentdaemon/src/util"
  9. //"github.com/cloudwise/agentdaemon/src/util/conf/dynamicLoad/config"
  10. log "github.com/sirupsen/logrus"
  11. "io"
  12. "io/ioutil"
  13. "net"
  14. "net/http"
  15. "net/url"
  16. "os"
  17. "strings"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. )
  22. var gProxyClient *Client
  23. const CheckHealthy = 10 * time.Second
  24. const FailCount = 3
  25. type Client struct {
  26. Endpoints []*Endpoint
  27. Current uint32
  28. Mutex sync.RWMutex
  29. }
  30. type Endpoint struct {
  31. Url string
  32. FailCount uint32
  33. LastFail time.Time
  34. Available atomic.Value
  35. Mutex sync.RWMutex
  36. }
  37. func (e *Endpoint) Failed() {
  38. e.Mutex.Lock()
  39. defer e.Mutex.Unlock()
  40. atomic.AddUint32(&e.FailCount, 1)
  41. e.LastFail = time.Now()
  42. if e.FailCount >= FailCount {
  43. e.Available.Store(false)
  44. }
  45. }
  46. func (e *Endpoint) Succeeded() {
  47. e.Mutex.Lock()
  48. defer e.Mutex.Unlock()
  49. atomic.StoreUint32(&e.FailCount, 0)
  50. e.Available.Store(true)
  51. }
  52. func (c *Client) GetStatus() bool {
  53. failAllEndpoint := false
  54. for _, v := range c.Endpoints {
  55. if v.Available.Load().(bool) {
  56. failAllEndpoint = true
  57. }
  58. }
  59. return failAllEndpoint
  60. }
  61. func (c *Client) DoWithNextEndpoint(method string, uri string, data []byte, header map[string]string, timeOut time.Duration) (*http.Response, error, bool) {
  62. c.Mutex.Lock()
  63. defer c.Mutex.Unlock()
  64. var (
  65. endpoint *Endpoint
  66. index uint32
  67. skipNum uint32
  68. noPoints bool
  69. )
  70. endpointsNum := len(c.Endpoints)
  71. if !c.GetStatus() {
  72. noPoints = true
  73. return nil, errors.New("all endpoints are unavailable."), noPoints
  74. }
  75. for i := uint32(0); i < uint32(endpointsNum); i++ {
  76. index = (atomic.LoadUint32(&c.Current) + i) % uint32(endpointsNum)
  77. if c.Endpoints[index].Available.Load().(bool) {
  78. endpoint = c.Endpoints[index]
  79. break
  80. }
  81. skipNum++
  82. }
  83. proxyUrl := endpoint.Url + uri
  84. log.WithField("module", "proxy").Debugf("proxy is <%s>", proxyUrl)
  85. req, err := http.NewRequest(method, proxyUrl, bytes.NewBuffer(data))
  86. if header != nil {
  87. contentType, ok := header["Content-Type"]
  88. if ok {
  89. req.Header.Set("Content-Type", contentType)
  90. delete(header, "Content-Type")
  91. }
  92. for k, v := range header {
  93. req.Header.Add(k, v)
  94. }
  95. }
  96. if err != nil {
  97. return nil, err, noPoints
  98. }
  99. // 加载自签名证书
  100. tlsConfig := &tls.Config{InsecureSkipVerify: true}
  101. //certFile := util.GetDefaultPath(util.MetaPath, "cert.pem")
  102. certFile := ""
  103. _, pemErr := os.Stat(certFile)
  104. if pemErr == nil {
  105. cert, err := ioutil.ReadFile(certFile)
  106. if err != nil {
  107. log.Fatal(err)
  108. }
  109. certPool := x509.NewCertPool()
  110. certPool.AppendCertsFromPEM(cert)
  111. // 配置TLS客户端
  112. tlsConfig = &tls.Config{RootCAs: certPool}
  113. }
  114. client := &http.Client{
  115. Timeout: timeOut,
  116. Transport: &http.Transport{
  117. Dial: func(netw, addr string) (net.Conn, error) {
  118. c, err := net.DialTimeout(netw, addr, time.Second*time.Duration(3))
  119. if err != nil {
  120. return nil, err
  121. }
  122. return c, nil
  123. },
  124. ResponseHeaderTimeout: time.Second * time.Duration(10),
  125. MaxIdleConnsPerHost: 20,
  126. IdleConnTimeout: time.Second * time.Duration(10),
  127. TLSClientConfig: tlsConfig,
  128. },
  129. }
  130. resp, clientErr := client.Do(req)
  131. log.Infof("[endpoint] %s%s body <%s>", endpoint.Url, uri, string(data))
  132. if clientErr != nil {
  133. endpoint.Failed()
  134. return nil, errors.New(fmt.Sprintf("[endpoint] %s url %s body %s client err is %v", endpoint.Url, uri, string(data), clientErr)), noPoints
  135. }
  136. if resp.StatusCode >= 400 {
  137. endpoint.Failed()
  138. return resp, fmt.Errorf("[endpoint] returned StatusCode=%d.", resp.StatusCode), noPoints
  139. }
  140. endpoint.Succeeded()
  141. atomic.AddUint32(&c.Current, 1+skipNum)
  142. return resp, nil, noPoints
  143. }
  144. func (c *Client) CheckEndpoints() {
  145. ticker := time.NewTicker(CheckHealthy)
  146. defer ticker.Stop()
  147. for range ticker.C {
  148. c.Mutex.Lock()
  149. for _, endpoint := range c.Endpoints {
  150. endpoint.Mutex.Lock()
  151. if !endpoint.Available.Load().(bool) && time.Since(endpoint.LastFail) > CheckHealthy {
  152. endpoint.Available.Store(true)
  153. endpoint.FailCount = 0
  154. }
  155. endpoint.Mutex.Unlock()
  156. }
  157. c.Mutex.Unlock()
  158. }
  159. }
  160. func (c *Client) SetEndpoint(endPointStr string, ssl bool) error {
  161. if endPointStr == "" {
  162. return fmt.Errorf("endpoints can not empty.")
  163. }
  164. c.Mutex.Lock()
  165. defer c.Mutex.Unlock()
  166. serverList := strings.Split(endPointStr, ",")
  167. var endpoints []*Endpoint
  168. var available atomic.Value
  169. phonetic := "http"
  170. if ssl {
  171. phonetic = "https"
  172. }
  173. available.Store(true)
  174. for _, server := range serverList {
  175. if !strings.HasPrefix(server, phonetic) {
  176. server = fmt.Sprintf("%s://%s", phonetic, server)
  177. }
  178. log.WithField("module", "proxy").Debugf("proxy server %s", server)
  179. endpoints = append(endpoints, &Endpoint{
  180. Url: server,
  181. FailCount: 0,
  182. LastFail: time.Time{},
  183. Available: available,
  184. Mutex: sync.RWMutex{},
  185. })
  186. }
  187. c.Endpoints = endpoints
  188. c.Current = 0
  189. return nil
  190. }
  191. func (c *Client) ProxyRequest2(method string, uri string, data []byte, header map[string]string) ([]byte, error) {
  192. u, err := url.Parse(uri)
  193. if err != nil {
  194. log.WithField("module", "proxy").WithError(err).Errorf("url Parse error")
  195. return nil, err
  196. }
  197. uri = u.RequestURI()
  198. for {
  199. resp, pointErr, noPoints := c.DoWithNextEndpoint(method, uri, data, header, time.Duration(60)*time.Second)
  200. if noPoints {
  201. log.WithField("module", "proxy").WithError(pointErr).Errorf("proxy has no healthy points")
  202. return nil, pointErr
  203. }
  204. if pointErr != nil {
  205. log.WithField("module", "proxy").WithError(pointErr).Errorf("proxy resp faild")
  206. time.Sleep(100 * time.Millisecond)
  207. continue
  208. }
  209. defer func() {
  210. if resp != nil {
  211. resp.Body.Close()
  212. }
  213. }()
  214. body, bodyError := ioutil.ReadAll(resp.Body)
  215. if bodyError != nil {
  216. log.WithField("module", "proxy").WithError(bodyError).Errorf("proxy ReadAll Body faild")
  217. time.Sleep(100 * time.Millisecond)
  218. continue
  219. }
  220. log.WithField("module", "proxy").Debugf("response: is %s", body)
  221. return body, pointErr
  222. }
  223. }
  224. func (c *Client) ProxyRequest(method string, uri string, data []byte, header map[string]string) ([]byte, error) {
  225. u, err := url.Parse(uri)
  226. if err != nil {
  227. log.WithField("module", "proxy").WithError(err).Errorf("url Parse error")
  228. return nil, err
  229. }
  230. uri = u.RequestURI()
  231. for {
  232. resp, pointErr, noPoints := c.DoWithNextEndpoint(method, uri, data, header, time.Duration(60)*time.Second)
  233. if noPoints {
  234. log.WithError(pointErr).
  235. Errorf("[proxy] has no healthy points")
  236. return nil, pointErr
  237. }
  238. var body []byte
  239. var bodyError error
  240. if resp != nil {
  241. defer func() {
  242. resp.Body.Close()
  243. }()
  244. body, bodyError = io.ReadAll(resp.Body)
  245. if bodyError != nil {
  246. log.WithError(bodyError).
  247. WithField("request", string(data)).
  248. WithField("uri", uri).
  249. Errorf("[proxy] readAll Body faild")
  250. time.Sleep(100 * time.Millisecond)
  251. continue
  252. }
  253. }
  254. if pointErr != nil {
  255. log.WithField("request", string(data)).
  256. WithField("response", string(body)).
  257. WithField("uri", uri).
  258. WithError(pointErr).
  259. Errorf("[proxy] resp faild")
  260. time.Sleep(100 * time.Millisecond)
  261. continue
  262. }
  263. log.Debugf("[proxy] response: is %s", body)
  264. return body, pointErr
  265. }
  266. }
  267. func (c *Client) ProxyRespRequest(method string, uri string, data []byte, header map[string]string) (*http.Response, error) {
  268. u, err := url.Parse(uri)
  269. if err != nil {
  270. return nil, err
  271. }
  272. uri = u.RequestURI()
  273. for {
  274. resp, err, noPoints := c.DoWithNextEndpoint(method, uri, data, header, time.Duration(600)*time.Second)
  275. if noPoints {
  276. log.WithField("module", "proxy").WithError(err).Errorf("proxy has no healthy points")
  277. return nil, err
  278. }
  279. if err != nil {
  280. log.WithField("module", "proxy").WithError(err).Errorf("proxy resp faild")
  281. time.Sleep(100 * time.Millisecond)
  282. continue
  283. }
  284. return resp, err
  285. }
  286. }
  287. func NewProxyClient(endPointStr string, ssl bool) (*Client, error) {
  288. if gProxyClient != nil {
  289. return gProxyClient, nil
  290. }
  291. var err error
  292. client := &Client{Mutex: sync.RWMutex{}}
  293. err = client.SetEndpoint(endPointStr, ssl)
  294. gProxyClient = client
  295. return client, err
  296. }
  297. func GetProxyClient() (*Client, error) {
  298. if gProxyClient != nil {
  299. return gProxyClient, nil
  300. } else {
  301. return nil, fmt.Errorf("ProxyClient is nil")
  302. }
  303. }
  304. func DownloadPackage(url, path string) error {
  305. http.DefaultClient.Timeout = time.Duration(600) * time.Second
  306. //htmlData, err := http.Get(url)
  307. proxyClient, _ := GetProxyClient()
  308. htmlData, err := proxyClient.ProxyRespRequest(http.MethodGet, url, nil, nil)
  309. if err != nil {
  310. log.WithError(err).Errorf("There are some network errors.")
  311. return err
  312. }
  313. defer htmlData.Body.Close()
  314. if htmlData == nil {
  315. return errors.New("Install agent failed,No data was obtained from the URL,URL is" + url)
  316. }
  317. file, err := os.Create(path)
  318. if err != nil {
  319. log.Errorf("download package create path[%s] error:%s", path, err)
  320. return err
  321. }
  322. defer file.Close()
  323. buf := make([]byte, 1024*5)
  324. totalSize := 0
  325. totalSizeBak := 0
  326. lastTime := time.Now()
  327. for {
  328. size, err := htmlData.Body.Read(buf)
  329. if err != nil && err != io.EOF {
  330. log.WithError(err).Errorf("There were some errors writing to the file during download.")
  331. return err
  332. }
  333. if size == 0 {
  334. break
  335. }
  336. file.Write(buf[:size])
  337. totalSize += size
  338. if totalSize-totalSizeBak > 5*1024*1024 {
  339. totalSizeBak = totalSize
  340. log.Infof("the current download size is %d MB", totalSize/1024/1024)
  341. elapsedTime := time.Since(lastTime)
  342. expectedTime := time.Second
  343. if expectedTime > elapsedTime {
  344. sleepTime := expectedTime - elapsedTime
  345. if sleepTime > 0 {
  346. time.Sleep(sleepTime)
  347. }
  348. }
  349. lastTime = time.Now()
  350. }
  351. }
  352. log.Infof("the current download size is %d MB", totalSize/1024/1024)
  353. return nil
  354. }