pinger.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. package pinger
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "os"
  9. "strings"
  10. "syscall"
  11. "time"
  12. "github.com/coroot/coroot-node-agent/proc"
  13. klog "github.com/sirupsen/logrus"
  14. "github.com/vishvananda/netns"
  15. "golang.org/x/net/icmp"
  16. "golang.org/x/net/ipv4"
  17. "golang.org/x/sys/unix"
  18. "inet.af/netaddr"
  19. )
  20. const (
  21. pingReplyPollTimeout = 10 * time.Millisecond
  22. protocolICMP = 1 // Internet Control Message
  23. )
  24. var (
  25. pingerID = os.Getpid() & 0xFFFF
  26. )
  27. type sentPacket struct {
  28. seq int
  29. txTimestamp time.Time
  30. }
  31. func Ping(ns netns.NsHandle, originNs netns.NsHandle, targets []netaddr.IP, timeout time.Duration) (map[netaddr.IP]float64, error) {
  32. if len(targets) < 1 {
  33. return nil, nil
  34. }
  35. var conn *net.IPConn
  36. err := proc.ExecuteInNetNs(ns, originNs, func() error {
  37. c, err := openConn()
  38. if err != nil {
  39. return err
  40. }
  41. conn = c
  42. return nil
  43. })
  44. if err != nil {
  45. return nil, fmt.Errorf("failed to open IPConn: %s", err)
  46. }
  47. defer conn.Close()
  48. f, err := conn.File()
  49. if err != nil {
  50. return nil, err
  51. }
  52. defer f.Close()
  53. fd := int(f.Fd())
  54. ids := make(map[netaddr.IP]*sentPacket, len(targets))
  55. for seq, ip := range targets {
  56. pkt := &sentPacket{seq: seq + 1, txTimestamp: time.Now()}
  57. if err := send(conn, pkt.seq, ip.IPAddr()); err != nil {
  58. if strings.HasPrefix(err.Error(), "resource temporarily unavailable") {
  59. continue
  60. }
  61. return nil, fmt.Errorf("failed to send packet to %s: %s", ip, err)
  62. }
  63. if pkt.txTimestamp, err = getTxTimestamp(fd); err != nil {
  64. if strings.HasPrefix(err.Error(), "resource temporarily unavailable") {
  65. continue
  66. }
  67. return nil, fmt.Errorf("failed to get TX timestamp: %s", err)
  68. }
  69. ids[ip] = pkt
  70. }
  71. timeoutTicker := time.NewTimer(timeout)
  72. defer timeoutTicker.Stop()
  73. rttByIp := make(map[netaddr.IP]float64, len(targets))
  74. for {
  75. select {
  76. case <-timeoutTicker.C:
  77. return rttByIp, nil
  78. default:
  79. if len(rttByIp) == len(targets) {
  80. return rttByIp, nil
  81. }
  82. remoteAddr, echoReply, rxTimestamp, err := receive(conn)
  83. if err != nil {
  84. if !strings.Contains(err.Error(), "interrupted system call") { // recvmsg timeout is not an issue
  85. klog.Errorln(err)
  86. }
  87. continue
  88. }
  89. if echoReply == nil {
  90. continue
  91. }
  92. if echoReply.ID != pingerID {
  93. continue
  94. }
  95. ip, ok := netaddr.FromStdIP(remoteAddr.IP)
  96. if !ok {
  97. continue
  98. }
  99. if pkt, ok := ids[ip]; ok && pkt.seq == echoReply.Seq {
  100. rtt := rxTimestamp.Sub(pkt.txTimestamp).Seconds()
  101. if rtt < 0 { // a small negative value is possible if the clock has adjusted by ntpd
  102. rtt = 0
  103. }
  104. rttByIp[ip] = rtt
  105. }
  106. }
  107. }
  108. }
  109. func send(conn *net.IPConn, seq int, ip net.Addr) error {
  110. msg := &icmp.Message{
  111. Type: ipv4.ICMPTypeEcho,
  112. Body: &icmp.Echo{
  113. ID: pingerID,
  114. Seq: seq,
  115. },
  116. }
  117. data, err := msg.Marshal(nil)
  118. if err != nil {
  119. return err
  120. }
  121. _, err = conn.WriteTo(data, ip)
  122. return err
  123. }
  124. func getTimestampFromOutOfBandData(oob []byte, oobn int) (time.Time, error) {
  125. cms, err := syscall.ParseSocketControlMessage(oob[:oobn])
  126. if err != nil {
  127. return time.Time{}, err
  128. }
  129. for _, cm := range cms {
  130. if cm.Header.Level == syscall.SOL_SOCKET || cm.Header.Type == syscall.SO_TIMESTAMPING {
  131. var t unix.ScmTimestamping
  132. if err := binary.Read(bytes.NewBuffer(cm.Data), binary.LittleEndian, &t); err != nil {
  133. return time.Time{}, err
  134. }
  135. return time.Unix(t.Ts[0].Unix()), nil
  136. }
  137. }
  138. return time.Time{}, fmt.Errorf("no timestamp found")
  139. }
  140. func getTxTimestamp(socketFd int) (time.Time, error) {
  141. pktBuf := make([]byte, 1024)
  142. oob := make([]byte, 1024)
  143. var t time.Time
  144. _, oobn, _, _, err := syscall.Recvmsg(socketFd, pktBuf, oob, syscall.MSG_ERRQUEUE)
  145. if err != nil {
  146. return t, err
  147. }
  148. return getTimestampFromOutOfBandData(oob, oobn)
  149. }
  150. func receive(conn *net.IPConn) (*net.IPAddr, *icmp.Echo, time.Time, error) {
  151. pktBuf := make([]byte, 1024)
  152. oob := make([]byte, 1024)
  153. var ts time.Time
  154. _ = conn.SetReadDeadline(time.Now().Add(pingReplyPollTimeout))
  155. n, oobn, _, ra, err := conn.ReadMsgIP(pktBuf, oob)
  156. if err != nil {
  157. if neterr, ok := err.(*net.OpError); ok && neterr.Timeout() {
  158. return nil, nil, ts, nil
  159. }
  160. if strings.Contains(err.Error(), "no message of desired type") {
  161. return nil, nil, ts, nil
  162. }
  163. return nil, nil, ts, err
  164. }
  165. echo, err := extractEchoFromPacket(pktBuf, n)
  166. if err != nil {
  167. return nil, nil, ts, fmt.Errorf("failed to extract ICMP Echo from IPv4 packet %s: %s", ra, err)
  168. }
  169. if echo == nil {
  170. return nil, nil, ts, nil
  171. }
  172. if ts, err = getTimestampFromOutOfBandData(oob, oobn); err != nil {
  173. return nil, nil, ts, fmt.Errorf("failed to get RX timestamp: %s", err)
  174. }
  175. return ra, echo, ts, nil
  176. }
  177. func extractEchoFromPacket(pktBuf []byte, n int) (*icmp.Echo, error) {
  178. if n < ipv4.HeaderLen {
  179. return nil, errors.New("malformed IPv4 packet")
  180. }
  181. pktBuf = pktBuf[ipv4.HeaderLen:]
  182. var m *icmp.Message
  183. m, err := icmp.ParseMessage(protocolICMP, pktBuf)
  184. if err != nil {
  185. return nil, err
  186. }
  187. if m.Type != ipv4.ICMPTypeEchoReply {
  188. return nil, nil
  189. }
  190. echo, ok := m.Body.(*icmp.Echo)
  191. if !ok {
  192. return nil, fmt.Errorf("malformed ICMP message body: %T", m.Body)
  193. }
  194. return echo, nil
  195. }
  196. func openConn() (*net.IPConn, error) {
  197. conn, err := net.ListenPacket("ip4:icmp", "0.0.0.0")
  198. if err != nil {
  199. return nil, err
  200. }
  201. ipconn := conn.(*net.IPConn)
  202. f, err := ipconn.File()
  203. if err != nil {
  204. return nil, err
  205. }
  206. defer f.Close()
  207. fd := int(f.Fd())
  208. flags := unix.SOF_TIMESTAMPING_SOFTWARE | unix.SOF_TIMESTAMPING_RX_SOFTWARE | unix.SOF_TIMESTAMPING_TX_SCHED |
  209. unix.SOF_TIMESTAMPING_OPT_CMSG | unix.SOF_TIMESTAMPING_OPT_TSONLY
  210. if err := syscall.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_TIMESTAMPING, flags); err != nil {
  211. return nil, err
  212. }
  213. timeout := syscall.Timeval{Sec: 0, Usec: 1000}
  214. if err := syscall.SetsockoptTimeval(fd, unix.SOL_SOCKET, unix.SO_RCVTIMEO, &timeout); err != nil {
  215. return nil, err
  216. }
  217. if err := syscall.SetsockoptTimeval(fd, unix.SOL_SOCKET, unix.SO_SNDTIMEO, &timeout); err != nil {
  218. return nil, err
  219. }
  220. return ipconn, nil
  221. }