Introduction
Circuit breaker mode, analogous to the circuit breaker mechanism in real circuits. When the line voltage is too high, the fuse will be broken, and the power can be restored after successful repair. Distributed scenarios also face service exceptions and network timeouts, which require a certain amount of time to recover. If the retry request is kept, it will return failure and take up resources during the unrecovered time. So the design core of the circuit breaker pattern is needed to “prevent infinite retries of a known failed operation” .
Principle
The circuit breaker is equivalent to a Proxy, which detects the success rate of requests and gives the correct processing by switching states when a certain threshold is reached.
The circuit breaker mode is implemented using a state machine and has the following three main states:
- Closed: The default state, allowing the operation to be executed. It listens to the number of failed operations and switches to the Open state when the threshold is reached.
- Open: Failed operation throws an exception and is timed according to the recovery time set in advance. When the recovery time is exceeded, the fuse switches to the Half-Open state.
- Half-Open: A certain number of operations are allowed, and if all of them succeed, the fault is considered to be recovered and the state is switched to Closed. If there is one failed operation, it is considered that the fault is not recovered and switches to the Open state.
gobreaker
Sony open source project gobreaker implements circuit breaker mode using state machines. The source code implementation is relatively simple with 350 lines.
Circuit Breaker Struct
1
2
3
4
5
6
7
8
|
type Settings struct {
Name string
MaxRequests uint32
Interval time.Duration
Timeout time.Duration
ReadyToTrip func(counts Counts) bool
OnStateChange func(name string, from State, to State)
}
|
Circuit breaker creation and configuration.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
type Settings struct {
Name string
MaxRequests uint32
Interval time.Duration
Timeout time.Duration
ReadyToTrip func(counts Counts) bool
OnStateChange func(name string, from State, to State)
}
func NewCircuitBreaker(st Settings) *CircuitBreaker {
cb := new(CircuitBreaker)
cb.name = st.Name
cb.onStateChange = st.OnStateChange
if st.MaxRequests == 0 {
cb.maxRequests = 1
} else {
cb.maxRequests = st.MaxRequests
}
if st.Interval <= 0 {
cb.interval = defaultInterval
} else {
cb.interval = st.Interval
}
if st.Timeout <= 0 {
cb.timeout = defaultTimeout
} else {
cb.timeout = st.Timeout
}
if st.ReadyToTrip == nil {
cb.readyToTrip = defaultReadyToTrip
} else {
cb.readyToTrip = st.ReadyToTrip
}
cb.toNewGeneration(time.Now())
return cb
}
|
The execution functions are divided into three main phases: beforeRequest
, Execute
and afterRequest
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
generation, err := cb.beforeRequest()
if err != nil {
return nil, err
}
defer func() {
e := recover()
if e != nil {
cb.afterRequest(generation, false)
panic(e)
}
}()
result, err := req()
cb.afterRequest(generation, err == nil)
return result, err
}
|
toNewGeneration
generates a new generation, with expiry as the expiry time. This is distinguished by status as:
- Open: expiry is the current time plus the Timeout recovery time in Setting.
- Closed: expiry is the current time plus the Interval in Setting for a monitoring period. Generating a new generation will clear the counts.
- Half-Open: expiry is cleared to zero, as determined by maxRequests.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
cb.generation++
cb.counts.clear()
var zero time.Time
switch cb.state {
case StateClosed:
if cb.interval == 0 {
cb.expiry = zero
} else {
cb.expiry = now.Add(cb.interval)
}
case StateOpen:
cb.expiry = now.Add(cb.timeout)
default: // StateHalfOpen
cb.expiry = zero
}
}
func (c *Counts) clear() {
c.Requests = 0
c.TotalSuccesses = 0
c.TotalFailures = 0
c.ConsecutiveSuccesses = 0
c.ConsecutiveFailures = 0
}
|
currentState
Gets the current state, distinguished by state as:
- Closed: expiry: expiry is 0, i.e. a new generation is generated when a monitoring cycle is reached, and the counts are cleared.
- Open: expiry When expiry time is 0, i.e., the recovery time is reached, set to Half-Open state.
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
switch cb.state {
case StateClosed:
if !cb.expiry.IsZero() && cb.expiry.Before(now) {
cb.toNewGeneration(now)
}
case StateOpen:
if cb.expiry.Before(now) {
cb.setState(StateHalfOpen, now)
}
}
return cb.state, cb.generation
}
|
beforeRequest
puts a mutual exclusion lock on Requests variables to prevent competition. currentState
Gets the current state, and performs different actions with the three states of the circuit breaker:
- Open: Throws an error directly.
- Half-Open && counts with accumulated Requests greater than the maxRequest threshold for the Half-Open state: return an error directly.
- Closed && Half-Open and cumulative Request is less than Half-Open state maxRequest threshold: Request count plus one.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if state == StateOpen {
return generation, ErrOpenState
} else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
return generation, ErrTooManyRequests
}
cb.counts.onRequest()
return generation, nil
}
func (c *Counts) onRequest() {
c.Requests++
}
|
afterRequest
puts a mutex lock on the counts variable to prevent competition. There are two states after the operation is executed:
- onSuccess: When the state is Closed, the count is changed, when the state is Half-Open, the count is changed and the number of ConsecutiveSuccesses is compared to whether the number of consecutive successful operations is greater than maxRequest, if it is, the state is changed to Closed.
- onFailure: change the count when the state is Closed, change the state to Open when readyToTrip is true, change the state to Open when the state is Half-Open.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if generation != before {
return
}
if success {
cb.onSuccess(state, now)
} else {
cb.onFailure(state, now)
}
}
func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
switch state {
case StateClosed:
cb.counts.onSuccess()
case StateHalfOpen:
cb.counts.onSuccess()
if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {
cb.setState(StateClosed, now)
}
}
}
func (c *Counts) onSuccess() {
c.TotalSuccesses++
c.ConsecutiveSuccesses++
c.ConsecutiveFailures = 0
}
func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
switch state {
case StateClosed:
cb.counts.onFailure()
if cb.readyToTrip(cb.counts) {
cb.setState(StateOpen, now)
}
case StateHalfOpen:
cb.setState(StateOpen, now)
}
}
func (c *Counts) onFailure() {
c.TotalFailures++
c.ConsecutiveFailures++
c.ConsecutiveSuccesses = 0
}
|