Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Commit 7388b75

Browse files
author
Juanjo Alvarez
committed
Added a checker that will detect dead sockets before the timeout (Linux only)
Signed-off-by: Juanjo Alvarez <[email protected]>
1 parent e742bea commit 7388b75

13 files changed

+772
-192
lines changed

go.mod

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
module github.com/src-d/go-mysql-server
22

33
require (
4+
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
45
github.com/VividCortex/gohistogram v1.0.0 // indirect
56
github.com/go-kit/kit v0.8.0
67
github.com/go-ole/go-ole v1.2.4 // indirect
@@ -12,18 +13,16 @@ require (
1213
github.com/mitchellh/hashstructure v1.0.0
1314
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
1415
github.com/opentracing/opentracing-go v1.0.2
15-
github.com/pbnjay/memory v0.0.0-20190104145345-974d429e7ae4
1616
github.com/pilosa/pilosa v1.3.0
1717
github.com/sanity-io/litter v1.1.0
18+
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect
1819
github.com/sirupsen/logrus v1.3.0
1920
github.com/spf13/cast v1.3.0
2021
github.com/src-d/go-oniguruma v1.0.0
2122
github.com/stretchr/testify v1.3.0
2223
go.etcd.io/bbolt v1.3.2
2324
google.golang.org/grpc v1.19.0 // indirect
2425
gopkg.in/src-d/go-errors.v1 v1.0.0
25-
gopkg.in/src-d/go-git.v4 v4.13.1
26-
gopkg.in/src-d/go-mysql-server.v0 v0.5.1
2726
gopkg.in/yaml.v2 v2.2.2
2827
vitess.io/vitess v3.0.0-rc.3.0.20190602171040-12bfde34629c+incompatible
2928
)

go.sum

+5-88
Large diffs are not rendered by default.

internal/sockstate/netstat.go

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package sockstate
2+
3+
import (
4+
"fmt"
5+
"net"
6+
)
7+
8+
// OS independent part of the netstat_[OS].go modules
9+
// Taken (simplified, privatized and with utility functions added) from:
10+
// https://github.com/cakturk/go-netstat
11+
12+
// skState type represents socket connection state
13+
type skState uint8
14+
15+
func (s skState) String() string {
16+
return skStates[s]
17+
}
18+
19+
// Socket states
20+
const (
21+
Established skState = 0x01
22+
SynSent skState = 0x02
23+
SynRecv skState = 0x03
24+
FinWait1 skState = 0x04
25+
FinWait2 skState = 0x05
26+
TimeWait skState = 0x06
27+
Close skState = 0x07
28+
CloseWait skState = 0x08
29+
LastAck skState = 0x09
30+
Listen skState = 0x0a
31+
Closing skState = 0x0b
32+
)
33+
34+
var skStates = [...]string{
35+
"UNKNOWN",
36+
"ESTABLISHED",
37+
"SYN_SENT",
38+
"SYN_RECV",
39+
"FIN_WAIT1",
40+
"FIN_WAIT2",
41+
"TIME_WAIT",
42+
"", // CLOSE
43+
"CLOSE_WAIT",
44+
"LAST_ACK",
45+
"LISTEN",
46+
"CLOSING",
47+
}
48+
49+
// sockAddr represents an ip:port pair
50+
type sockAddr struct {
51+
IP net.IP
52+
Port uint16
53+
}
54+
55+
func (s *sockAddr) String() string {
56+
return fmt.Sprintf("%v:%d", s.IP, s.Port)
57+
}
58+
59+
// sockTabEntry type represents each line of the /proc/net/tcp
60+
type sockTabEntry struct {
61+
Ino string
62+
LocalAddr *sockAddr
63+
RemoteAddr *sockAddr
64+
State skState
65+
UID uint32
66+
Process *process
67+
}
68+
69+
// process holds the PID and process name to which each socket belongs
70+
type process struct {
71+
pid int
72+
name string
73+
}
74+
75+
func (p *process) String() string {
76+
return fmt.Sprintf("%d/%s", p.pid, p.name)
77+
}
78+
79+
// AcceptFn is used to filter socket entries. The value returned indicates
80+
// whether the element is to be appended to the socket list.
81+
type AcceptFn func(*sockTabEntry) bool

internal/sockstate/netstat_darwin.go

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package sockstate
2+
3+
import "github.com/sirupsen/logrus"
4+
5+
// tcpSocks returns a slice of active TCP sockets containing only those
6+
// elements that satisfy the accept function
7+
func tcpSocks(accept AcceptFn) ([]sockTabEntry, error) {
8+
// (juanjux) TODO: not implemented
9+
logrus.Info("Connection checking not implemented for Darwin")
10+
return []sockTabEntry{}, nil
11+
}

internal/sockstate/netstat_linux.go

+208
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
package sockstate
2+
3+
// Taken (simplified and with utility functions added) from https://github.com/cakturk/go-netstat
4+
5+
import (
6+
"bufio"
7+
"bytes"
8+
"encoding/binary"
9+
"fmt"
10+
"io"
11+
"io/ioutil"
12+
"log"
13+
"net"
14+
"os"
15+
"path"
16+
"strconv"
17+
"strings"
18+
)
19+
20+
const (
21+
pathTCPTab = "/proc/net/tcp"
22+
ipv4StrLen = 8
23+
)
24+
25+
type procFd struct {
26+
base string
27+
pid int
28+
sktab []sockTabEntry
29+
p *process
30+
}
31+
32+
const sockPrefix = "socket:["
33+
34+
func getProcName(s []byte) string {
35+
i := bytes.Index(s, []byte("("))
36+
if i < 0 {
37+
return ""
38+
}
39+
j := bytes.LastIndex(s, []byte(")"))
40+
if i < 0 {
41+
return ""
42+
}
43+
if i > j {
44+
return ""
45+
}
46+
return string(s[i+1 : j])
47+
}
48+
49+
func (p *procFd) iterFdDir() {
50+
// link name is of the form socket:[5860846]
51+
fddir := path.Join(p.base, "/fd")
52+
fi, err := ioutil.ReadDir(fddir)
53+
if err != nil {
54+
return
55+
}
56+
var buf [128]byte
57+
58+
for _, file := range fi {
59+
fd := path.Join(fddir, file.Name())
60+
lname, err := os.Readlink(fd)
61+
if err != nil {
62+
continue
63+
}
64+
65+
for i := range p.sktab {
66+
sk := &p.sktab[i]
67+
ss := sockPrefix + sk.Ino + "]"
68+
if ss != lname {
69+
continue
70+
}
71+
if p.p == nil {
72+
stat, err := os.Open(path.Join(p.base, "stat"))
73+
if err != nil {
74+
return
75+
}
76+
n, err := stat.Read(buf[:])
77+
_ = stat.Close()
78+
if err != nil {
79+
return
80+
}
81+
z := bytes.SplitN(buf[:n], []byte(" "), 3)
82+
name := getProcName(z[1])
83+
p.p = &process{p.pid, name}
84+
}
85+
sk.Process = p.p
86+
}
87+
}
88+
}
89+
90+
func extractProcInfo(sktab []sockTabEntry) {
91+
const basedir = "/proc"
92+
fi, err := ioutil.ReadDir(basedir)
93+
if err != nil {
94+
return
95+
}
96+
97+
for _, file := range fi {
98+
if !file.IsDir() {
99+
continue
100+
}
101+
pid, err := strconv.Atoi(file.Name())
102+
if err != nil {
103+
continue
104+
}
105+
base := path.Join(basedir, file.Name())
106+
proc := procFd{base: base, pid: pid, sktab: sktab}
107+
proc.iterFdDir()
108+
}
109+
}
110+
111+
func parseIPv4(s string) (net.IP, error) {
112+
v, err := strconv.ParseUint(s, 16, 32)
113+
if err != nil {
114+
return nil, err
115+
}
116+
ip := make(net.IP, net.IPv4len)
117+
binary.LittleEndian.PutUint32(ip, uint32(v))
118+
return ip, nil
119+
}
120+
121+
func parseAddr(s string) (*sockAddr, error) {
122+
fields := strings.Split(s, ":")
123+
if len(fields) < 2 {
124+
return nil, fmt.Errorf("sockstate: not enough fields: %v", s)
125+
}
126+
var ip net.IP
127+
var err error
128+
switch len(fields[0]) {
129+
case ipv4StrLen:
130+
ip, err = parseIPv4(fields[0])
131+
default:
132+
log.Fatal("Bad formatted string")
133+
}
134+
if err != nil {
135+
return nil, err
136+
}
137+
v, err := strconv.ParseUint(fields[1], 16, 16)
138+
if err != nil {
139+
return nil, err
140+
}
141+
return &sockAddr{IP: ip, Port: uint16(v)}, nil
142+
}
143+
144+
func parseSocktab(r io.Reader, accept AcceptFn) ([]sockTabEntry, error) {
145+
br := bufio.NewScanner(r)
146+
tab := make([]sockTabEntry, 0, 4)
147+
148+
// Discard title
149+
br.Scan()
150+
151+
for br.Scan() {
152+
var e sockTabEntry
153+
line := br.Text()
154+
// Skip comments
155+
if i := strings.Index(line, "#"); i >= 0 {
156+
line = line[:i]
157+
}
158+
fields := strings.Fields(line)
159+
if len(fields) < 12 {
160+
return nil, fmt.Errorf("sockstate: not enough fields: %v, %v", len(fields), fields)
161+
}
162+
addr, err := parseAddr(fields[1])
163+
if err != nil {
164+
return nil, err
165+
}
166+
e.LocalAddr = addr
167+
addr, err = parseAddr(fields[2])
168+
if err != nil {
169+
return nil, err
170+
}
171+
e.RemoteAddr = addr
172+
u, err := strconv.ParseUint(fields[3], 16, 8)
173+
if err != nil {
174+
return nil, err
175+
}
176+
e.State = skState(u)
177+
u, err = strconv.ParseUint(fields[7], 10, 32)
178+
if err != nil {
179+
return nil, err
180+
}
181+
e.UID = uint32(u)
182+
e.Ino = fields[9]
183+
if accept(&e) {
184+
tab = append(tab, e)
185+
}
186+
}
187+
return tab, br.Err()
188+
}
189+
190+
// tcpSocks returns a slice of active TCP sockets containing only those
191+
// elements that satisfy the accept function
192+
func tcpSocks(accept AcceptFn) ([]sockTabEntry, error) {
193+
f, err := os.Open(pathTCPTab)
194+
defer func() {
195+
_ = f.Close()
196+
}()
197+
if err != nil {
198+
return nil, err
199+
}
200+
201+
tabs, err := parseSocktab(f, accept)
202+
if err != nil {
203+
return nil, err
204+
}
205+
206+
extractProcInfo(tabs)
207+
return tabs, nil
208+
}

internal/sockstate/netstat_windows.go

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package sockstate
2+
3+
import "github.com/sirupsen/logrus"
4+
5+
// tcpSocks returns a slice of active TCP sockets containing only those
6+
// elements that satisfy the accept function
7+
func tcpSocks(accept AcceptFn) ([]sockTabEntry, error) {
8+
// (juanjux) TODO: not implemented
9+
logrus.Info("Connection checking not implemented for Windows")
10+
return []sockTabEntry{}, nil
11+
}

0 commit comments

Comments
 (0)