November 23, 2019
原文:https://kasvith.github.io/posts/lets-create-a-simple-lb-go/
在互联网架构中,负载均衡处于一个举足轻重的地位。它能够帮助我们平衡每个后端服务节点的负载。此外,当某些节点出现问题时,负载均衡还可以根据策略将流量更多的分配给正常的节点,以此来保证服务的稳定性。
在使用过 nginx 之后,我决定造一个简单的负载均衡轮子。我将使用 Go 来实现它,Go 是一种以并发作为第一公民的现代语言。Go 拥有丰富的标准库,能够用较少的代码编写高性能的应用程序。它还能生成一个静态链接的单二进制文件,以方便分发。
常见的负载均衡器提供了多种策略来分配负载,比如说:
我们这里只实现这些策略中最简单的一个 —— Round Robin。
Round Robin 机制简单明了,它后面的每个后端节点都有平等的机会来处理负载。
如图所示,负载均衡会将请求挨个分配到每个后端节点。但我们并不能直接使用它 —— 如果后端某个节点出现故障,我们不希望请求继续打到这个节点上。因此针对这个负载分配机制,我们需要增加一些条件 —— 只将流量分配到健康的节点上。
按照制定好的负载均衡策略,我们需要追踪每个后端节点的详细情况,以便根据其健康状况,将请求打到对应的地址上。
我们先定义一个结构体来保存后端节点的相关信息:
type Backend struct {
URL *url.URL
Alive bool
mux sync.RWMutex
ReverseProxy *httputil.ReverseProxy
}
接下来,我们需要把所有后端节点集中管理起来,于是就有了 ServerPool:
type ServerPool struct {
backends []*Backend
current uint64
}
正如我们之前已经明确过的,负载均衡的真实目的是将流量路由到不同的后端节点,并将结果返回给发起请求的客户端。
根据 Go 文档中的描述:
ReverseProxy 是一个用来代理 HTTP 请求到另一个服务的处理程序。
这正是我们想要的东西。这里没必要造轮子,ReverseProxy 能够满足我们的需求。
u, _ := url.Parse("http://localhost:8080")
rp := httputil.NewSingleHostReverseProxy(u)
// initialize your server and add this as handler
http.HandlerFunc(rp.ServeHTTP)
我们使用 httputil.NewSingleHostReverseProxy(url) 初始化一个反向代理,将请求转发给参数中的 url。
我们看一下 ServeHTTP 方法的定义:
func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request)
再看一下 net/http 中对 HandlerFunc 的定义:
type HandlerFunc func(ResponseWriter, *Request)
就这样,我们很自然的实现了一个反向代理。
你可以在文档中找到更多的例子:https://golang.org/pkg/net/http/httputil/#ReverseProxy。
在选择代理到哪个后端节点时,需要跳过出问题的节点。要实现这一点,我们的程序需要做一些特殊的处理。
当多个客户端并发的对负载均衡器发起请求时,我们的负载均衡器就会面临资源竞争的问题了。为了防止多个请求并发的操作 ServerPool,我们可以使用互斥锁来锁定 ServerPool。但引入锁之后会让我们的程序变的复杂,并且并发性能也会有所损失,还有更好的办法吗?我们实际上只是想要知道下一个连接代理到哪个后端去,因此我们只要找到下一个连接对应的后端节点索引就可以了。实际上,我们只需要实现一个支持原子操作的计数器就能满足我们的需求了。
func (s *ServerPool) NextIndex() int {
return int(atomic.AddUint64(&s.current, uint64(1)) % uint64(len(s.backends)))
}
上面的程序中,我们对当前值进行原子递增操作。我们使用递增后的结果对 backends 的数量求余数,就可以将这次请求,分配到对应的 backend 上。
我们已经知道,在我们实现的负载均衡器中,通过不断地循环后端节点列表,来把请求分配到不同的节点上。在这个过程中,我们的程序需要跳过不健康的节点。
NextIndex 总是返回一个介于 0 - 后端节点数量之间的值,我们拿到一个后端节点后,先判断其健康状态,如果它不健康,则重复以上流程。
以循环的方式遍历切片。
如上图所示,我们希望从下一个位置开始,遍历整个后端节点列表,直到找到一个正常的节点。我们可以通过 next+length 来保证我们的循环能够遍历到每个节点而又不会重复。找到正常的节点之后,顺便把 current 标记为该节点:
// GetNextPeer returns next active peer to take a connection
func (s *ServerPool) GetNextPeer() *Backend {
// loop entire backends to find out an Alive backend
next := s.NextIndex()
l := len(s.backends) + next // start from next and move a full cycle
for i := next; i < l; i++ {
idx := i % len(s.backends) // take an index by modding with length
// if we have an alive backend, use it and store if its not the original one
if s.backends[idx].IsAlive() {
if i != next {
atomic.StoreUint64(&s.current, uint64(idx)) // mark the current one
}
return s.backends[idx]
}
}
return nil
}
有一个很严重的问题我们还没考虑到 —— 我们的 Backend 保存在一个变量中,而这个变量可以被多个 goroutine 同时访问或修改,我们希望同一时间只有一个 goroutine 在修改这个变量,而在修改中的变量不能被其他 goroutine 访问到,不在修改中的变量允许多个 goroutine 并发访问。从这个场景看,我们需要使用读写锁 —— RWMutex 。
// SetAlive for this backend
func (b *Backend) SetAlive(alive bool) {
b.mux.Lock()
b.Alive = alive
b.mux.Unlock()
}
// IsAlive returns true when backend is alive
func (b *Backend) IsAlive() (alive bool) {
b.mux.RLock()
alive = b.Alive
b.mux.RUnlock()
return
}
有了上面的逻辑,我们就能通过下面的方式来平衡收到的请求。只有当所有后端节点都无法访问时,它才会失败。
// lb load balances the incoming request
func lb(w http.ResponseWriter, r *http.Request) {
peer := serverPool.GetNextPeer()
if peer != nil {
peer.ReverseProxy.ServeHTTP(w, r)
return
}
http.Error(w, "Service not available", http.StatusServiceUnavailable)
}
这个方法可以作为一个 HandlerFunc 传入 http server 中。
server := http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: http.HandlerFunc(lb),
}
我们目前的 lb 有个严重的问题 —— 我们不知道节点的健康状况。要知道这一信息,只能尝试访问后端节点,检查它是否还活着。
我们有两种方法可以选择:
后端节点提供一个健康检查接口,负载均衡器每隔一段时间访问这个接口来确认节点的健康状况
ReverseProxy 会触发一个回调函数,ErrorHandler 是用来处理异常的回调函数。我们可以使用这个函数来检测故障。下面是实现方法:
proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) {
log.Printf("[%s] %s\n", serverUrl.Host, e.Error())
retries := GetRetryFromContext(request)
if retries < 3 {
select {
case <-time.After(10 * time.Millisecond):
ctx := context.WithValue(request.Context(), Retry, retries+1)
proxy.ServeHTTP(writer, request.WithContext(ctx))
}
return
}
// after 3 retries, mark this backend as down
serverPool.MarkBackendStatus(serverUrl, false)
// if the same request routing for few attempts with different backends, increase the count
attempts := GetAttemptsFromContext(request)
log.Printf("%s(%s) Attempting retry %d\n", request.RemoteAddr, request.URL.Path, attempts)
ctx := context.WithValue(request.Context(), Attempts, attempts+1)
lb(writer, request.WithContext(ctx))
}
在这里,我们利用了闭包的力量来设计这个错误处理程序。它允许我们在方法中捕获外部变量,比如 server url。这个程序会检查重试次数,如果小于 3 我们会再次向同一个后端节点发起相同的请求。这是因为,我们没办法断定这次请求失败的原因,也许只是因为客户端的连接数太多,后端节点没有足够的 socket 来承载连接导致的。因此,我们设置了一个计时器,将重试延迟 10 毫秒。每次重试,我们都会记录重试次数。
超过 3 次重试仍然失败,我们就会把这个后端节点标记为 down。
接下来,我们不能把这个失败的请求丢弃掉,我们会将这个请求分配到别的后端节点上再次尝试,我们将该请求的尝试次数也记录下来,并将这个请求传入 lb 函数,来分配一个新的节点。
我们不能这样无休止的尝试下去,因此需要设置一个最大尝试次数。
我们直接从 context 中获取尝试次数,如果超过了最大尝试次数,就放弃该请求。
// lb load balances the incoming request
func lb(w http.ResponseWriter, r *http.Request) {
attempts := GetAttemptsFromContext(r)
if attempts > 3 {
log.Printf("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path)
http.Error(w, "Service not available", http.StatusServiceUnavailable)
return
}
peer := serverPool.GetNextPeer()
if peer != nil {
peer.ReverseProxy.ServeHTTP(w, r)
return
}
http.Error(w, "Service not available", http.StatusServiceUnavailable)
}
这个实现是递归的。
Context 包允许我们在整个生命周期中存储有用的数据。我们大量使用这种方法来追踪特定数据,比如尝试次数和重试次数。
首先我们要给 Context 指定一个特殊的键。这里建议使用全局唯一的整数键,而不是字符串。Go 语言提供了 iota 关键字来实现递增常量,每个常量包含一个唯一的整数值。这是一个定义整数键的完美解决方案。
const (
Attempts int = iota
Retry
)
然后我们可以像 HashMap 那样检索值,如下所示。默认返回值取决于用例。
// GetAttemptsFromContext returns the attempts for request
func GetRetryFromContext(r *http.Request) int {
if retry, ok := r.Context().Value(Retry).(int); ok {
return retry
}
return 0
}
被动健康检查,可以不断地检查后端节点的状态来确认它的状况。我们通过固定间隔的 ping 来检查后端节点的存活情况。
我们需要先建立一个 TCP 连接来完成 ping 操作。如果后端节点响应,我们就把它标记为正常节点。当然我们也可以访问一个特定的路由(比如:/status)来完成健康检查。确保在连接建立之后关闭它,以减少服务器中的额外负载。否则,服务器会一直尝试维持连接,最终耗尽资源。
// isAlive checks whether a backend is Alive by establishing a TCP connection
func isBackendAlive(u *url.URL) bool {
timeout := 2 * time.Second
conn, err := net.DialTimeout("tcp", u.Host, timeout)
if err != nil {
log.Println("Site unreachable, error: ", err)
return false
}
_ = conn.Close() // close it, we dont need to maintain this connection
return true
}
现在我们可以挨个遍历后端节点,并更新他们的状态了。
// HealthCheck pings the backends and update the status
func (s *ServerPool) HealthCheck() {
for _, b := range s.backends {
status := "up"
alive := isBackendAlive(b.URL)
b.SetAlive(alive)
if !alive {
status = "down"
}
log.Printf("%s [%s]\n", b.URL, status)
}
}
为了定期做健康检查,我们需要在 Go 程序中起一个定时器。定时器会在指定时间到达时,通过 channel 通知做具体处理的 goroutine。
// healthCheck runs a routine for check status of the backends every 2 mins
func healthCheck() {
t := time.NewTicker(time.Second * 20)
for {
select {
case <-t.C:
log.Println("Starting health check...")
serverPool.HealthCheck()
log.Println("Health check completed")
}
}
}
在上面的代码中,<- t.C 通道将每隔 20s 返回一个值。select 会一直等待下面的 case 中有匹配事件的发生。
最后,起一个单独的 goroutine 中做健康检查。
go healthCheck()
在这篇文章中,我们涉及到很多内容:
这个简化的负载均衡器还有很多地方可以进行优化:
等等。
你可以在这里查看源代码:https://github.com/kasvith/simplelb/
感谢阅读~
痕迹
没有过去,就没法认定现在的自己