Background
Today, I received feedback from the business team that an online application failed to send a message to Pulsar, and after checking the logs, I learned that a java.lang.InterruptedException
exception was thrown when sending the message.
After communicating with the business, we learned that the message sending was triggered in a gRPC
interface, and the exception lasted about half an hour before returning to normal, which is the background of the whole problem.
Troubleshooting
After getting the issue, we first checked whether it was a common problem, and checked other applications without finding similar exceptions; we also checked the monitoring of the Pulsar broker, and there were still no fluctuations and exceptions in this time period.
This can be initially ruled out as a problem of the Pulsar server.
The next step is to check the load of the application during that time, from the application QPS to the memory situation of each JVM, there is still no significant change.
Pulsar source code troubleshooting
Since it seems that the application itself and the Pulsar broker are fine, we have to look at the exception itself to troubleshoot it.
The first step is to find out what version of Pulsar-client
is being used, because the business uses the official SDK-based springboot starter
, so the first step is to check whether this starter
has any impact.
By looking at the source code basically ruled out the suspicion of starter
, which simply encapsulates the function of SDK
.
|
|
The next step is to analyze the stack, because some of the source code of Pulsar-client is not directly packaged into the dependencies, and many lines of code are not correct if decompiled, so we need to pull the official source code locally and switch to the for branch to view it.
I switched the branch to branch-2.8
directly here.
Starting at the top of the stack to troubleshoot TypedMessageBuilderImpl.java:91
:
It looks like an exception was thrown when the message was sent internally asynchronously.
Moving on, see here.
It seems to be correct here, but the number of lines of code is obviously not correct; since this branch of 2.8 has also been fixed for several versions, it is normal for the number of lines of code to be inconsistent with the latest code due to changes in the middle.
|
|
In order to confirm whether it is really this line of code, I turned the file forward a few versions to finally confirm that it is this line of code is correct.
Let’s open the source code for java.util.concurrent.Semaphore#acquire()
.
|
|
The source code shows that the acquire()
function does respond to interrupts, and throws an InterruptedException
once it detects that the current thread has been interrupted.
Locating the problem
So the cause of the problem is basically determined, that is, the thread sending the message in Pulsar is interrupted, but why it is interrupted needs to be investigated.
We know that thread interrupt is required to call the Thread.currentThread().interrupt();
API, so we first guess whether a thread inside the Pulsar client interrupted the sending thread.
So I searched for the code in the module pulsar-client
.
Excluding anything not related to the producer, all the rest of the code that interrupts the thread just continues to pass after the exception; so it seems that pulsar-client does not actively interrupt internally.
Since Pulsar itself does not, it is only possible that the business code is doing the interrupting?
So I did a search in the business code.
Sure enough, I found the only interrupt in the business code, and the call relationship tells me that this code is executed before the message is sent, and is in the same thread as the Pulsar send function.
The approximate pseudo-code is as follows.
|
|
Executing this code reproduces the same stack exactly.
Fortunately, there is also an output log here for interrupts.
Searching through the logs, I found that the time of the exception and the time point of the log of this interrupt coincide exactly, so I also know the root cause.
Because the business thread and the message sending thread are the same, in some cases Thread.currentThread().interrupt();
, in fact, simply executing this line of function does not happen, as long as there is no response to this interrupt, that is, the Semaphore
source code in the judgment of the thread interrupt marker.
But it just so happens that the business interruption here did not determine this flag itself, causing Pulsar to determine it internally and eventually throwing this exception.
Summary
So in the final analysis, the code here is not reasonable, first of all, they interrupt the thread but also did not use, which leads to the possibility of being used by other base libraries, so it will cause some unpredictable consequences.
Another is that it is not recommended to use Thread.currentThread().interrupt();
in the business code, the first look does not know what to do, but also not easy to maintain.
In fact, the essence of thread interrupt is also a means of inter-thread communication, with such needs can be completely replaced by built-in BlockQueue
functions to achieve.
Ref
https://crossoverjie.top/2023/02/23/pulsar/pulsar-interrupted/