Development environment build
Development based on Ubuntu 18.04.
Install Bazel
1
2
|
sudo wget -O /usr/local/bin/bazel https://github.com/bazelbuild/bazelisk/releases/latest/download/bazelisk-linux-amd64
sudo chmod +x /usr/local/bin/bazel
|
Install base dependencies
1
|
sudo apt-get install libtool cmake automake autoconf make ninja-build curl unzip virtualenv
|
Clang build environment (optional)
Download and install from llvm, version 9.0 is more compatible.
1
2
|
bazel/setup_clang.sh <PATH_TO_EXTRACTED_CLANG_LLVM>
echo "build --config=clang" >> user.bazelrc
|
Building DEBUG versions with bazel
1
|
bazel build -c dbg --spawn_strategy=standalone //source/exe:envoy-static
|
Integrating Clion
Converting Bazel to Cmake
1
2
3
4
|
git clone https://github.com/lizan/bazel-cmakelists.git <PATH>
<bazel-cmakelists dir>/bazel-cmakelists --targets //source/exe:envoy-static //test/...
# If you don't need to build you can
<bazel-cmakelists dir>/bazel-cmakelists --targets //source/exe:envoy-static //test/... --skip_build
|
- Just open Clion and import the Cmake project, Clion’s
Bazel plugin
does not work well against Envoy.
- Just set it to Debug start.
Development environment build reference
Envoy Foundation
Libevent
1
|
Envoy is an L7 proxy and communication bus designed for large modern service oriented architectures. The project was born out of the belief.
|
As described in the official website, Envoy is a high-performance proxy service software, supporting L4 and L7 proxy capabilities. But Envoy is not a product that completely repeats the wheel, Envoy’s underlying network part of interaction with the operating system is libevent.
libevent is a lightweight network event library that provides the ability to handle callbacks for events such as reading and writing to the underlying socket.
The export function.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
static const char MESSAGE[] = "Hello, World!\n";
static const int PORT = 9995;
static void listener_cb(struct evconnlistener *, evutil_socket_t,
struct sockaddr *, int socklen, void *);
static void conn_writecb(struct bufferevent *, void *);
static void conn_eventcb(struct bufferevent *, short, void *);
static void signal_cb(evutil_socket_t, short, void *);
int
main(int argc, char **argv)
{
struct event_base *base;
struct evconnlistener *listener;
struct event *signal_event;
struct sockaddr_in sin = {0};
WSADATA wsa_data;
WSAStartup(0x0201, &wsa_data);
base = event_base_new();
if (!base) {
fprintf(stderr, "Could not initialize libevent!\n");
return 1;
}
sin.sin_family = AF_INET;
sin.sin_port = htons(PORT);
listener = evconnlistener_new_bind(base, listener_cb, (void *)base,
LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
(struct sockaddr*)&sin,
sizeof(sin));
if (!listener) {
fprintf(stderr, "Could not create a listener!\n");
return 1;
}
signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base);
if (!signal_event || event_add(signal_event, NULL)<0) {
fprintf(stderr, "Could not create/add a signal event!\n");
return 1;
}
event_base_dispatch(base);
evconnlistener_free(listener);
event_free(signal_event);
event_base_free(base);
printf("done\n");
return 0;
}
static void
listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *sa, int socklen, void *user_data)
{
struct event_base *base = user_data;
struct bufferevent *bev;
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
if (!bev) {
fprintf(stderr, "Error constructing bufferevent!");
event_base_loopbreak(base);
return;
}
bufferevent_setcb(bev, NULL, conn_writecb, conn_eventcb, NULL);
bufferevent_enable(bev, EV_WRITE);
bufferevent_disable(bev, EV_READ);
bufferevent_write(bev, MESSAGE, strlen(MESSAGE));
}
static void
conn_writecb(struct bufferevent *bev, void *user_data)
{
struct evbuffer *output = bufferevent_get_output(bev);
if (evbuffer_get_length(output) == 0) {
printf("flushed answer\n");
bufferevent_free(bev);
}
}
static void
conn_eventcb(struct bufferevent *bev, short events, void *user_data)
{
if (events & BEV_EVENT_EOF) {
printf("Connection closed.\n");
} else if (events & BEV_EVENT_ERROR) {
printf("Got an error on the connection: %s\n",
strerror(errno));/*XXX win32*/
}
/* None of the other events can happen here, since we haven't enabled
* timeouts */
bufferevent_free(bev);
}
static void
signal_cb(evutil_socket_t sig, short events, void *user_data)
{
struct event_base *base = user_data;
struct timeval delay = { 2, 0 };
printf("Caught an interrupt signal; exiting cleanly in two seconds.\n");
event_base_loopexit(base, &delay);
}
|
From the above, we can naturally understand that we handle the four events lister/read/write/signal for listening.
Libevent in Envoy
Continuing from the above, the positioning of libevent in Envoy is also on the bottom side, and the packets obtained by different events will be truly twisted to Envoy’s internal system for processing, let’s take a common example of reading. But before we talk about this, let’s take a look at the entry definition of Envoy’s code.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
int main(int argc, char** argv) {
std::unique_ptr<Envoy::MainCommon> main_common;
try {
main_common = std::make_unique<Envoy::MainCommon>(argc, argv);
} catch (const Envoy::NoServingException& e) {
return EXIT_SUCCESS;
} catch (const Envoy::MalformedArgvException& e) {
std::cerr << e.what() << std::endl;
return EXIT_FAILURE;
} catch (const Envoy::EnvoyException& e) {
std::cerr << e.what() << std::endl;
return EXIT_FAILURE;
}
return main_common->run() ? EXIT_SUCCESS : EXIT_FAILURE;
}
|
Leaving aside the system-compatible code, we can find that, like most system designs, we have a bottom-level design called MainCommon
, which allows us to locate the real startup location through a big type jump.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
void InstanceImpl::run() {
// RunHelper exists primarily to facilitate how we respond to early shutdown during
// startup (see RunHelperTest in server_test.cc).
const auto run_helper = RunHelper(*this, options_, *dispatcher_, clusterManager(),
access_log_manager_, init_manager_, overloadManager(), [this] {
notifyCallbacksForStage(Stage::PostInit);
startWorkers(); // ➁
});
// Run the main dispatch loop waiting to exit.
ENVOY_LOG(info, "starting main dispatch loop");
auto watchdog =
guard_dog_->createWatchDog(api_->threadFactory().currentThreadId(), "main_thread");
watchdog->startWatchdog(*dispatcher_);
dispatcher_->post([this] { notifyCallbacksForStage(Stage::Startup); });
dispatcher_->run(Event::Dispatcher::RunType::Block); ➀
ENVOY_LOG(info, "main dispatch loop exited");
guard_dog_->stopWatching(watchdog);
watchdog.reset();
terminate();
}
|
Ignoring the code around ➀, we actually find that our final main loop is here. After a lot of jumping around, we finally peel back the layers and see the Libevent part. Referring to the thread pattern at ➁.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
void LibeventScheduler::run(Dispatcher::RunType mode) {
int flag = 0;
switch (mode) {
case Dispatcher::RunType::NonBlock:
flag = EVLOOP_NONBLOCK;
break;
case Dispatcher::RunType::Block:
// The default flags have 'block' behavior. See
// http://www.wangafu.net/~nickm/libevent-book/Ref3_eventloop.html
break;
case Dispatcher::RunType::RunUntilExit:
flag = EVLOOP_NO_EXIT_ON_EMPTY;
break;
}
event_base_loop(libevent_.get(), flag); ➀
}
|
The ➀ processing just happens to be the Loop loop for the Libevent we eventually built.
1
|
int event_base_loop(struct event_base *, int);
|
The event_base will only stop working when there are no registered events inside it. The next step is to find out when and where we created the various events for the event_base?
Not surprisingly, after some careful investigation, we can find the logic for listening to the port in ListenerImpl::setupServerSocket.
1
2
3
4
|
void ListenerImpl::setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket) {
listener_.reset(
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd()));
}
|
This is different from Netty and other network frameworks, Libevent is a global event listener library, he can listen to different ports, recall, Socket is not distinguished as Server Socket
and Client Socket
, exactly, what is done here, is to create a Server Socket
and listen to its listenCallback, which is the event when a new connection is created.
In the ListenCallback
function.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
int remote_addr_len, void* arg) {
ListenerImpl* listener = static_cast<ListenerImpl*>(arg);
// Create the IoSocketHandleImpl for the fd here.
IoHandlePtr io_handle = std::make_unique<IoSocketHandleImpl>(fd);
// Get the local address from the new socket if the listener is listening on IP ANY
// (e.g., 0.0.0.0 for IPv4) (local_address_ is nullptr in this case).
const Address::InstanceConstSharedPtr& local_address =
listener->local_address_ ? listener->local_address_
: listener->getLocalAddress(io_handle->fd());
listener->cb_.onAccept(
std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address, remote_address));
}
|
In the last line it is clear that when our ServerSocket
gets a new client connection it will pass the file handle fd
.
Editor’s note: The idea here is that everything is a file, and Libevent’s Read callback is not used here, so this FD is already passed to Envoy’s system when it is created here.
When we look further, the logic of the callback function here when we are actually receiving a network request is as follows.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
void ConnectionHandlerImpl::ActiveTcpListener::onAcceptWorker(
Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections, bool rebalanced) {
auto active_socket = std::make_unique<ActiveTcpSocket>(*this, std::move(socket),
hand_off_restored_destination_connections);
// Create and run the filters
config_.filterChainFactory().createListenerFilterChain(*active_socket); ➀
active_socket->continueFilterChain(true);
// Move active_socket to the sockets_ list if filter iteration needs to continue later.
// Otherwise we let active_socket be destructed when it goes out of scope.
if (active_socket->iter_ != active_socket->accept_filters_.end()) {
active_socket->startTimer();
active_socket->moveIntoListBack(std::move(active_socket), sockets_); ➁
}
}
|
➀ FilterChain is created here for the socket.
➁ Press the socket into the queue to be processed, from here we can venture to assume that for the Socket that has established a connection with the server, Envoy will use a thread management similar to the Reactor pattern of the Netty type. That is, the receiving thread is the Listener, and the real Worker will get the working Socket from the List here.
Additional information:
After another series of operations, we come to the place where we create a new connection (actually, it’s more like assigning a new connection) ConnectionHandlerImpl::ActiveTcpListener::newConnection.
But here we still can’t find the logic of Read, so we start Debug and we set a breakpoint at the read data ConnectionImpl::onRead.
We still find that the starting point is still event_base_loop, so our question naturally becomes, where exactly is the event registered, because we know that the registration in is event_base object, we look by way of usage and find a suspicious point FileEventImpl:: assignEvents.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
void FileEventImpl::assignEvents(uint32_t events, event_base* base) {
ASSERT(base != nullptr);
event_assign(
&raw_event_, base, fd_,
EV_PERSIST | (trigger_ == FileTriggerType::Level ? 0 : EV_ET) |
(events & FileReadyType::Read ? EV_READ : 0) |
(events & FileReadyType::Write ? EV_WRITE : 0) |
(events & FileReadyType::Closed ? EV_CLOSED : 0),
[](evutil_socket_t, short what, void* arg) -> void {
auto* event = static_cast<FileEventImpl*>(arg);
uint32_t events = 0;
// ... skip something
},
this);
}
|
It looks like the event assignment is made here, we set a breakpoint and run it again. From the stack of calls, we can clearly see that
In OnAccpet
-> newConnection
-> AssignEvents
such a logic, just because the stack of the whole process is too deep for us to easily discover, the registration of events is created when the socket of the client is created. The rest of the registration logic is in the libevent section, so we won’t go deeper.
Threading model
First you can read the Envoy threading model
In Envoy
Libevent
acts as a lower-level lib dependency, just like Tomcat when we write SpringWeb, for Socket
events such as reads and writes are delegated to Libevnet
. The first step is to create a Listen event for our Event_base. After that, when the connection is established, Envoy adds the listen for the Read/Write events of the socket.
In fact, if you set a breakpoint at AssginEvents and then restart, the first event registered is for the read event of the configuration file (for hot update configuration), followed by the event registration of DNS, after which we start the Envoy service, and after that, we have to initialize the various Listener in the configuration file, but it is worth noting that LibeventScheduler::run is present in every Worker.
Unlike the processing of Netty
, the Listener here is also distributed among all Threads. That is, when a new request is received, one of the workers is randomly selected for processing.
Chain in Envoy
In the above, we already know that in the Listen
event created by ClientSocket
, we registered the Read/Write
event of this Socket
to Event_Base
, and in the registered event, we easily locate ConnectionImpl::onRead
This is where the read logic is.
Request handling chain
1
2
3
4
5
6
7
8
9
10
11
12
|
void ConnectionImpl::onRead(uint64_t read_buffer_size) {
if (!read_enabled_ || inDelayedClose()) {
return;
}
ASSERT(ioHandle().isOpen());
if (read_buffer_size == 0 && !read_end_stream_) {
return;
}
filter_manager_.onRead();
}
|
In the last line, we get to the beginning of the entire process processing FilterManager
to start the processing logic. In the next few jumps, we find out where the core of the processing logic lies.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter,
std::list<ActiveReadFilterPtr>::iterator entry;
if (!filter) {
connection_.streamInfo().addBytesReceived(buffer_source.getReadBuffer().buffer.length());
entry = upstream_filters_.begin();
} else {
entry = std::next(filter->entry());
}
for (; entry != upstream_filters_.end(); entry++) {
if (!(*entry)->initialized_) {
(*entry)->initialized_ = true;
FilterStatus status = (*entry)->filter_->onNewConnection(); ➀
if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
return;
}
}
StreamBuffer read_buffer = buffer_source.getReadBuffer();
if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream); ➁
if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
return;
}
}
}
|
➀ For Envoy, it also specifies a series of abstract qualifications for the Fitler. For example, Network::ReadFilter
defines a series of ReadFilter dummy functions, and we can see that at the first Filter, we call onNewConnection Here is necessarily a lifecycle callback function, as for the specifics we need to check the specific implementation class to determine.
➁ From here we find the logic of the real read data. From here because the focus of this article is on the processing of the Http protocol, we here at ReadFilter will also look at Http::ConnectionManagerImpl
the implementation mechanism of this class.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
etwork::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
bool redispatch;
do {
redispatch = false;
try {
codec_->dispatch(data); //将此数据分发出去
}
if (codec_->protocol() < Protocol::Http2) {
if (read_callbacks_->connection().state() == Network::Connection::State::Open &&
data.length() > 0 && streams_.empty()) {
redispatch = true;
}
if (!streams_.empty() && streams_.front()->state_.remote_complete_) {
read_callbacks_->connection().readDisable(true);
}
}
} while (redispatch);
if (!read_callbacks_->connection().streamInfo().protocol()) {
read_callbacks_->connection().streamInfo().protocol(codec_->protocol());
}
return Network::FilterStatus::StopIteration; // return
}
|
In the process of implementation here, we at least found that envoy’s Filter is not like Java’s Fitler which is called layer by layer, each layer will only do its own thing, because from the code, we can find these because if the normal processing process, we should go through all the Fitler, so based on this logic, we can tell that in this s
logic, we should just decode the HTTP, however, in the actual breakpoint, there is actually an optimization in this codec that when we Pase the HTTP request, the logic will be processed directly here, specifically jumping to the following.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
void ServerConnectionImpl::onMessageComplete() {
if (active_request_.has_value()) {
auto& active_request = active_request_.value();
active_request.remote_complete_ = true;
if (deferred_end_stream_headers_) {
active_request.request_decoder_->decodeHeaders(
std::move(absl::get<RequestHeaderMapPtr>(headers_or_trailers_)), true); ➀
deferred_end_stream_headers_ = false;
} else if (processing_trailers_) {
active_request.request_decoder_->decodeTrailers(
std::move(absl::get<RequestTrailerMapPtr>(headers_or_trailers_)));
} else {
Buffer::OwnedImpl buffer;
active_request.request_decoder_->decodeData(buffer, true);
}
// Reset to ensure no information from one requests persists to the next.
headers_or_trailers_.emplace<RequestHeaderMapPtr>(nullptr);
}
http_parser_pause(&parser_, 1);
}
|
At ➀ you will enter the Route stage (in fact, I think the design here is not very clear, resulting in having to resort to Debug tools during the reading process), where you finally enter the ConfigImpl::route
logic.
1
2
3
4
5
6
7
8
9
10
|
RouteConstSharedPtr RouteMatcher::route(const Http::RequestHeaderMap& headers,
const StreamInfo::StreamInfo& stream_info,
uint64_t random_value) const {
const VirtualHostImpl* virtual_host = findVirtualHost(headers);
if (virtual_host) {
return virtual_host->getRouteFromEntries(headers, stream_info, random_value);
} else {
return nullptr;
}
}
|
The next step in the processing logic is determined by whether or not the hit Route is returned. In the logic that follows, we follow the breakpoint to the
1
2
3
4
5
6
7
8
9
|
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers, bool end_stream) {
// omit....
decodeHeaders(nullptr, *request_headers_, end_stream);
// Reset it here for both global and overridden cases.
resetIdleTimer();
}
|
In the ConnectionManagerImpl::ActiveStream::decodeHeaders
code there is a lot of logic to deal with httpheader, here we do not expand, we enter, the most important decodeHeaders function to continue our exploration, in the logic here, we can find Here a very complex logic processing, we slowly look at.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
// Determine if Route is owned or not, if not it is directly Return
route_ = callbacks_->route();
if (!route_) {
config_.stats_.no_route_.inc();
ENVOY_STREAM_LOG(debug, "no cluster match for URL '{}'", *callbacks_,
headers.Path()->value().getStringView());
callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound);
callbacks_->sendLocalReply(Http::Code::NotFound, "", modify_headers, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RouteNotFound);
return Http::FilterHeadersStatus::StopIteration;
}
// Find the routing object for this request
route_entry_ = route_->routeEntry();
// Get the configured Cluster, which is actually the address of the target
cluster_ = cluster->info();
// Get a connection from upstream in order to get the host
const auto& upstream_http_protocol_options = cluster_->upstreamHttpProtocolOptions();
Http::ConnectionPool::Instance* http_pool = getHttpConnPool();
Upstream::HostDescriptionConstSharedPtr host;
ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers);
// Push the data to be processed onto the stack
modify_headers_ = modify_headers;
UpstreamRequestPtr upstream_request =
std::make_unique<UpstreamRequest>(*this, std::make_unique<HttpConnPool>(*http_pool));
upstream_request->moveIntoList(std::move(upstream_request), upstream_requests_);
upstream_requests_.front()->encodeHeaders(end_stream);
if (end_stream) {
onRequestComplete();
}
return Http::FilterHeadersStatus::StopIteration;
}
|
In upstream_requests_.front()->encodeHeaders(end_stream)
, there is a different step
1
2
3
4
5
6
|
void UpstreamRequest::encodeHeaders(bool end_stream) {
ASSERT(!encode_complete_);
encode_complete_ = end_stream;
conn_pool_->newStream(this); ➀
}
|
➀ Here we create a downstream connection. as we explore deeper into our code.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
ConnPoolImplBase::ActiveClient::ActiveClient(ConnPoolImplBase& parent,
uint64_t lifetime_request_limit,
uint64_t concurrent_request_limit)
: parent_(parent), remaining_requests_(translateZeroToUnlimited(lifetime_request_limit)),
concurrent_request_limit_(translateZeroToUnlimited(concurrent_request_limit)),
connect_timer_(parent_.dispatcher_.createTimer([this]() -> void { onConnectTimeout(); })) {
Upstream::Host::CreateConnectionData data = parent_.host_->createConnection(
parent_.dispatcher_, parent_.socket_options_, parent_.transport_socket_options_);
real_host_description_ = data.host_description_;
codec_client_ = parent_.createCodecClient(data);
codec_client_->addConnectionCallbacks(*this);
conn_connect_ms_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
parent_.host_->cluster().stats().upstream_cx_connect_ms_, parent_.dispatcher_.timeSource());
conn_length_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
parent_.host_->cluster().stats().upstream_cx_length_ms_, parent_.dispatcher_.timeSource());
connect_timer_->enableTimer(parent_.host_->cluster().connectTimeout());
}
|
We can see here that I’m actually going to create a downstream connection. But we know that for a high-performance WebServer, we can’t Blocking for downstream access, so our common processing capability should be to hang the downstream request after it is established, and we wait for the return of the downstream before processing it, so later we can find that
1
2
3
4
5
6
7
8
9
10
11
|
ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
TransportSocketPtr&& transport_socket,
StreamInfo::StreamInfo& stream_info, bool connected)
: ConnectionImplBase(dispatcher, next_global_id_++),
// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
file_event_ = dispatcher_.createFileEvent(
ioHandle().fd(), [this](uint32_t events) -> void { onFileEvent(events); }, trigger,
Event::FileReadyType::Read | Event::FileReadyType::Write);
transport_socket_->setTransportSocketCallbacks(*this);
|
When we create the downstream link, we register the Read
and Write
events of the downstream reply data to the Event_Base
of the current Worker
thread. Let’s take a look at the whole flow of the first half here.
Up to the above, we have gone through most of the process from receiving the request to creating the request to the downstream, after which there are some other transactional ones that need to be handled, such as response timeout, etc. Without going too deep, we move to the next link: the response data processing link.
Response data processing link
For the response data, we also know that after we finish the execution, we load the subsequent read and write events to BackendSteam into EventBase
Let’s see how the subsequent data is handled, the entrance is still the OnRead()
function, and the logic after that is similar to the previous one, until FilterManagerImpl:: onRead
is called after the specific implementation class is CodecClient
will be relatively simple after all, because the response body we do not need to do any more routing settings and other operations, and then when we finish reading, we enter the ConnectionImpl::onMessageCompleteBase
function, and enter the end of the process. Let’s see how we respond to the initial caller when we’ve finished reading all the data from the response stream. Then it goes to ClientConnectionImpl::onMessageComplete
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
void ClientConnectionImpl::onMessageComplete() {
ENVOY_CONN_LOG(trace, "message complete", connection_);
if (pending_response_.has_value()) { // 当We only process when we have a pending Responses
if (connection_.state() == Network::Connection::State::Open) {
while (!connection_.readEnabled()) {
connection_.readDisable(false); // Let the connection disable reads because it is going to write
}
}
if (deferred_end_stream_headers_) {
response.decoder_->decodeHeaders(
std::move(absl::get<ResponseHeaderMapPtr>(headers_or_trailers_)), true);
deferred_end_stream_headers_ = false;
} else if (processing_trailers_) {
response.decoder_->decodeTrailers(
std::move(absl::get<ResponseTrailerMapPtr>(headers_or_trailers_)));
} else {
Buffer::OwnedImpl buffer;
response.decoder_->decodeData(buffer, true); //Write data
}
// Reset to ensure no information from one requests persists to the next.
pending_response_.reset();
headers_or_trailers_.emplace<ResponseHeaderMapPtr>(nullptr);
}
}
|
During the response, decodeData
is called and eventually returns to ConnectionMangerImpl
, and then in a series of operations, we come to the place where we finally write.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through_filter_chain) {
ASSERT(!end_stream || enable_half_close_);
if (write_end_stream_) {
ASSERT(data.length() == 0 && end_stream);
return;
}
if (through_filter_chain) {
current_write_buffer_ = &data;
current_write_end_stream_ = end_stream;
FilterStatus status = filter_manager_.onWrite(); // ➀ 这里进入了写入流的 FilterChain
current_write_buffer_ = nullptr;
if (FilterStatus::StopIteration == status) { // 值得注意的是这里是可以提前终结
return;
}
}
write_end_stream_ = end_stream;
if (data.length() > 0 || end_stream) {
ENVOY_CONN_LOG(trace, "writing {} bytes, end_stream {}", *this, data.length(), end_stream);
write_buffer_->move(data); // ➁ 将数据流写入
if (!connecting_) {
ASSERT(file_event_ != nullptr, "ConnectionImpl file event was unexpectedly reset");
file_event_->activate(Event::FileReadyType::Write);
}
}
}
|
After all our data has been written, we enter the final Cleanup
process.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
void ConnectionImpl::closeSocket(ConnectionEvent close_type) {
ENVOY_CONN_LOG(debug, "closing socket: {}", *this, static_cast<uint32_t>(close_type));
transport_socket_->closeSocket(close_type);
// Drain input and output buffers.
updateReadBufferStats(0, 0);
updateWriteBufferStats(0, 0);
write_buffer_->drain(write_buffer_->length());
connection_stats_.reset();
file_event_.reset();
socket_->close();
raiseEvent(close_type);
}
|
Here is the end of a paragraph, we will organize the whole process.
Envoy Extend
From the analysis above, we have sort of read through the main implementation part of the code, leaving us to look at the part of Envoy that sets aside extensions for us. Extending Envoy for custom use cases, from the configuration file, we can also get some inspiration.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
static_resources:
listeners:
- name: listener_0
address:
socket_address: { address: 127.0.0.1, port_value: 10000 }
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager
stat_prefix: ingress_http
codec_type: AUTO
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match: { prefix: "/" }
route: { cluster: some_service }
http_filters:
- name: envoy.filters.http.router
|
We can find that the most important part for Envoy is FilterChains, although we have not written CPP, but from the similarity of ideas, we should be able to expand the implementation of Filter, and then registered to this FilterChians, fortunately for the official, provide a more detailed DEMO. As you can see from it, our core is to inherit the Interface of the Fitler that is already there and then implement it.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
namespace Envoy {
namespace Filter {
/**
* Implementation of a basic echo filter.
*/
class Echo2 : public Network::ReadFilter, Logger::Loggable<Logger::Id::filter> {
public:
// Network::ReadFilter
Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override;
Network::FilterStatus onNewConnection() override { return Network::FilterStatus::Continue; }
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override {
read_callbacks_ = &callbacks;
}
private:
Network::ReadFilterCallbacks* read_callbacks_{};
};
} // namespace Filter
} // namespace Envoy
|
It is also relatively easy to use.
1
2
3
4
5
6
7
8
9
10
11
12
13
|
static_resources:
clusters:
name: cluster_0
connect_timeout: 0.25s
load_assignment:
cluster_name: cluster_0
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 0
|
But behind the scenes, how does the mechanism of this work? We still need to come from the source code. The existing Extensions
implementation can be found here extensions
From the code above, we can easily locate the logic to initialize the FilterChain
at ListenerImpl::createNetworkFilterChain
.
1
2
3
4
5
|
bool ListenerImpl::createNetworkFilterChain(
Network::Connection& connection,
const std::vector<Network::FilterFactoryCb>& filter_factories) {
return Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories);
}
|
As you can see, it is a very standard factory pattern. Depending on the factory, we will create different Filter instances, and after the creation, if it is ReaderFliter, it will be directly added to the bottom of upsteam, and if it is WriterFliter, it will be added to the bottom of downsteam. The code is also relatively simple, as shown below.
1
2
3
4
5
6
|
void FilterManagerImpl::addReadFilter(ReadFilterSharedPtr filter) {
ASSERT(connection_.state() == Connection::State::Open);
ActiveReadFilterPtr new_filter(new ActiveReadFilter{*this, filter});
filter->initializeReadFilterCallbacks(*new_filter);
new_filter->moveIntoListBack(std::move(new_filter), upstream_filters_);
}
|
Then the problem becomes our Filter_Factory
from which we find two implementations from the virtual table Admin
& Impl
, if you have read the official website documentation you should be able to guess that the Admin
implementation is based on dynamic requests. I’d better restart and trace the process from the beginning. Initializing FilterFactory can be located at the earliest to ListenerManagerImpl::addOrUpdateListener
and the profile hierarchy is similar, we create Listerner from the first, in the process of creating Listener
we create FilterChain
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
filter_chain_manager_.addFilterChain(config.filter_chains(), builder, filter_chain_manager_);
void FilterChainManagerImpl::addFilterChain(
absl::Span<const envoy::config::listener::v3::FilterChain* const> filter_chain_span,
FilterChainFactoryBuilder& filter_chain_factory_builder,
FilterChainFactoryContextCreator& context_creator) {
Cleanup cleanup([this]() { origin_ = absl::nullopt; });
std::unordered_set<envoy::config::listener::v3::FilterChainMatch, MessageUtil, MessageUtil>
filter_chains;
uint32_t new_filter_chain_size = 0;
for (const auto& filter_chain : filter_chain_span) {
const auto& filter_chain_match = filter_chain->filter_chain_match();
if (!filter_chain_match.address_suffix().empty() || filter_chain_match.has_suffix_len()) {
throw EnvoyException(fmt::format("error adding listener '{}': contains filter chains with "
"unimplemented fields",
address_->asString()));
}
if (filter_chains.find(filter_chain_match) != filter_chains.end()) {
throw EnvoyException(fmt::format("error adding listener '{}': multiple filter chains with "
"the same matching rules are defined",
address_->asString()));
}
filter_chains.insert(filter_chain_match);
// Validate IP addresses.
std::vector<std::string> destination_ips;
destination_ips.reserve(filter_chain_match.prefix_ranges().size());
for (const auto& destination_ip : filter_chain_match.prefix_ranges()) {
const auto& cidr_range = Network::Address::CidrRange::create(destination_ip);
destination_ips.push_back(cidr_range.asString());
}
std::vector<std::string> source_ips;
source_ips.reserve(filter_chain_match.source_prefix_ranges().size());
for (const auto& source_ip : filter_chain_match.source_prefix_ranges()) {
const auto& cidr_range = Network::Address::CidrRange::create(source_ip);
source_ips.push_back(cidr_range.asString());
}
// Reject partial wildcards, we don't match on them.
for (const auto& server_name : filter_chain_match.server_names()) {
if (server_name.find('*') != std::string::npos &&
!FilterChainManagerImpl::isWildcardServerName(server_name)) {
throw EnvoyException(
fmt::format("error adding listener '{}': partial wildcards are not supported in "
"\"server_names\"",
address_->asString()));
}
}
// Reuse created filter chain if possible.
// FilterChainManager maintains the lifetime of FilterChainFactoryContext
// ListenerImpl maintains the dependencies of FilterChainFactoryContext
auto filter_chain_impl = findExistingFilterChain(*filter_chain);
if (filter_chain_impl == nullptr) {
filter_chain_impl =
filter_chain_factory_builder.buildFilterChain(*filter_chain, context_creator);
++new_filter_chain_size;
}
addFilterChainForDestinationPorts(
destination_ports_map_,
PROTOBUF_GET_WRAPPED_OR_DEFAULT(filter_chain_match, destination_port, 0), destination_ips,
filter_chain_match.server_names(), filter_chain_match.transport_protocol(),
filter_chain_match.application_protocols(), filter_chain_match.source_type(), source_ips,
filter_chain_match.source_ports(), filter_chain_impl);
fc_contexts_[*filter_chain] = filter_chain_impl;
}
convertIPsToTries();
ENVOY_LOG(debug, "new fc_contexts has {} filter chains, including {} newly built",
fc_contexts_.size(), new_filter_chain_size);
}
|
Envoy Admin
When we build the Example, there is a paragraph.
1
2
3
4
|
admin:
access_log_path: /tmp/admin_access.log
address:
socket_address: { address: 127.0.0.1, port_value: 9901 }
|
is the administrative port for Enovy. It mainly provides a series of statistics, etc. See Envoy-Admin for details
Envoy Dynamic Manager
We analyzed the simple process of reading from a file, but for a full-fledged gateway, the type of dynamically loaded data is indispensable, so Envoy also provides several capabilities to dynamically obtain configurations, such as LDS
, XDS
, etc.
LDS
The listener discovery service (LDS) is an optional API that Envoy will call to dynamically fetch listeners. Envoy will reconcile the API response and add, modify, or remove known listeners depending on what is required.