Agrona
is a Java toolkit developed by real-logic that provides a number of high-performance data structures and tooling methods, mainly including.
- Buffers - Thread safe direct and atomic buffers for working with on and off heap memory with memory ordering semantics.
- Lists - Array backed lists of int/long primitives to avoid boxing.
- Maps - Open addressing and linear probing with int/long primitive keys to object reference values.
- Maps - Open addressing and linear probing with int/long primitive keys to int/long values.
- Sets - Open addressing and linear probing for int/long primitives and object references.
- Cache - Set Associative with int/long primitive keys to object reference values.
- Clocks - Clock implementations to abstract system clocks, allow caching, and enable testing.
- Queues - Lock-less implementations for low-latency applications.
- Ring/Broadcast Buffers - implemented off-heap for IPC communication.
- Simple Agent framework for concurrent services.
- Signal handling to support “Ctrl + c” in a server application.
- Scalable Timer Wheel - For scheduling timers at a given deadline with O(1) register and cancel time.
- Code generation from annotated implementations specialised for primitive types.
- Off-heap counters implementation for application telemetry, position tracking, and coordination.
- Implementations of InputStream and OutputStream that can wrap direct buffers.
- DistinctErrorLog - A log of distinct errors to avoid filling disks with existing logging approaches.
- IdGenerator - Concurrent and distributed unique id generator employing a lock-less implementation of the Twitter Snowflake algorithm.
I. Agents
1.1 Duty Cycle
Duty Cycle
is a programming model, it is a dead loop program, in the loop, to execute a certain logic, and according to the result of the execution to decide whether to wait a while for the next cycle. For example.
1.2 Agent
In Agrona, Agents
are defined.
doWork()
, which is used to handle the business logic, returns a value that determines whether to execute the idle policy in Agrona.
- When the return value is greater than 0, the idle policy is not triggered and Agrona executes the next
doWork()
immediately - When the return value is less than or equal to 0, the specified idle policy is executed
In addition, onStart()
and onClose()
serve as callback hook methods when the Agent is started and closed, and roleName()
asserts the name of the Agent.
1.3 Idle Strategies
Agrona natively provides a number of idle strategies.
Name | Implementation Details |
---|---|
SleepingIdleStrategy | Based on parkNanos for thread suspension |
SleepingMillisIdleStrategy | Based on thread.sleep for local development on low-configuration machines or development with a large number of processes |
YieldingIdleStrategy | Gives control over threads using thread.yield |
BackoffIdleStrategy | An aggressive strategy that uses spinning followed by yield (Thread.yield()) and then parkNanos at the configured time, which is the default strategy for Aeron Cluster |
NoOpIdleStrategy | the most aggressive policy, no processing |
BusySpinIdleStrategy | For Java 9 and above, Thread.onSpinWait() will be used. This provides a hint to the CPU that the thread is in a tight loop but busy waiting for something, and then the CPU may allocate additional resources to another thread without involving the OS scheduler. |
If you need to customize the idle strategy, you only need to implement the IdleStrategy
interface.
The idle policy above does not necessarily guarantee thread safety, so it is recommended that each Agent use a separate idle policy.
1.4 Agent Runner
The Agent Runner
is responsible for combining and running the Agent and Idle Strategies.
|
|
Above is the constructor of AgentRunner, where
parameters | meaning |
---|---|
idleStrategy | IdleStrategy instance object |
errorHandler | The callback handler when an exception occurs during the Agent’s execution |
errorCounter | Records the number of exceptions that occurred during the Agent’s execution |
agent | Agent instance object |
After getting the AgentRunner object, Agrona provides the following three ways to actually start it.
AgentRunner#startOnThread(AgentRunner)
, which will create a thread to run after executionAgentRunner#startOnThread(AgentRunner, ThreadFactory)
, which will create a separate thread using the specified threadFactory- form a
CompositeAgent
with multiple Agents and call the above two methods, these Agents will share a common thread to run
1.5 Agent Invoker
If we want to control the Agent manually, Agrona provides the AgentInvoker
.
|
|
You can see that the constructor method removes the idle policy compared to AgentRunner, because it is the Agent that needs to be executed manually, so this parameter is not needed.
II. Clocks
Agrone provides its own Clock API, first of all it is based on Epoch Time, which is the time difference since 1970-1-1 00:00:00.000 until now. The top-level interface is EpochClock
and there are two implementations: SystemEpochClock
and CachedEpochClock
.
For SystemEpochClock
, the millisecond time difference is returned, which is actually a wrapper around System.currentTimeMillis()
, providing a static instance for the operation.
For CachedEpochClock
, which is actually a cache, there are several methods.
- update(long timeMs) sets the timeMs directly to the cache
- advance(long timeMs) adds timeMs to the cache on top of the original value
- time() Get the result of the cached value
In addition, Agrone also provides microsecond and nanosecond APIs:
SystemEpochMicroClock
based on thejava.time.Instant
API implementationSystemEpochNanoClock
is based on thejava.time.Instant
API implementationOffsetEpochNanoClock
calls theSystem.nanoTime()
API in a timed sampling fashion, allowing you to adjust the sampling interval and parameters as needed
III. RingBuffer
The author of Aeron
developed disruptor during his tenure at LMAX. In Agrona, the authors also provide support for this data structure.
3.1 OneToOneRingBuffer
For single-producer-single-consumer scenarios, unlike the RingBuffer in Disruptor, an additional RingBufferDescriptor.TRAILER_LENGTH
is required to define the RingBuffer size, and the ByteBuffer
API determines whether the buffer is allocated inside or outside the heap.
The following code shows the creation of a OneToOneRingBuffer of size 4096, using an off-heap allocation buffer.
MessageHandler
When consuming data, you need to implement the MessageHandler
interface, e.g.
|
|
Where the msgType
field is the identifier of the message and will be stored in the message header. If this field is not used, it must be set to a value greater than 0.
RingBuffer#write
When producing data, the RingBuffer’s write
method needs to be called, e.g.
|
|
sentOk
indicates whether the write was successful or not, and this can be used to perform a backpressure operation to prevent consumers from consuming it. ringBuffer provides the following two methods to show the current production and consumption.
ControlledMessageHandler
In addition to the MessageHandler interface ControlledMessageHandler
can also implement the consumption of RingBuffer.
|
|
The difference is that the onMessage()
method returns the `ControlledMessageHandler:
- ABORT : This aborts the read operation for this message. It will be delivered again on next read
- BREAK : This stops further processing after the current message for this read.
- COMMIT : Continues processing, but commits at the current message.
- CONTINUE : Continues processing, committing at the end of the current batch (this is equivalent to the standard handler).
TryClaim
When writing data, you can also use the tryClaim()
method to manipulate the underlying RingBuffer data structure directly.
First, call tryClaim()
to get the index that can be written, then get the underlying data structure of the RingBuffer, write data to it, and finally, call commit
or abort
to finish.
3.2 ManyToOneRingBuffer
The API is consistent with OneToOneRingBuffer and supports multi-producer scenarios.
3.3 Broadcast
OneToOneRingBuffer and ManyToOneRingBuffer are both in single-consumer scenarios. If multiple consumers are needed, Agrona provides BroadcastTransmitter
and BroadcastReceiver
.
Note in particular that under Broadcast, messages are discarded if the sender is producing faster than the consumer can consume (no backpressure support).
|
|
|
|
IV. Data Structures
Agrona provides a number of collection data structures for solving the overhead of needing to box and unbox the base data type in a collection.
4.1 HashMaps
When using the IDE for DEUBG, Agrona HashMaps may have problems with the wrong elements in it. To solve this you can set
shouldAvoidAllocation
in the constructor to false and Agrona will turn off caching, but this will also result in an increase in GCs.
Collection | Notes |
---|---|
Int2IntHashMap | <int, int> of HashMap |
Int2NullableObjectHashMap | <int, nullable object> of HashMap |
HashMap of Int2ObjectHashMap | <int, object> . |
Long2LongHashMap | HashMap of <long, long> |
The HashMap of Long2NullableObjectHashMap | <long, nullable object> is identified by NullReference inside the collection if the value is null. |
Long2ObjectHashMap | <long, object> of HashMap |
HashMap of Object2IntHashMap | <object, int> |
Object2NullableObjectHashMap | HashMap of <object, nullable object> |
Object2ObjectHashMap | HashMap of <object, object> |
When using these HashMaps, you need to make sure that the element hashCode()
is correct, and the performance of the collection can be greatly affected if hashCodes are heavily conflicted.
4.2 Caches
Collection | Notes |
---|---|
Int2ObjectCache | Cache with primitive int lookup to an object. Tuned for very small data structures stored within CPU cache lines. Typical sizes are 2 to 16 entries. Underlying storage is an array. |
IntLruCache | Fixed size cache, use LRU policy to clear expired cache when limit is reached |
4.3 HashSets
Collection | Notes |
---|---|
IntHashSet | HashSet of base int type, automatically expanded. |
ObjectHashSet | HashSet of type object, automatically expanded. |
V. Direct Buffer
Agrona’s use of the
sun.misc.Unsafe
andsun.nio.ch.SelectedImpl.selectedKeys
APIs may cause the JVM to have a warning log printed at startup about illegal reflection accesses. If you want to remove the selections, add the JVM parameters:--add-opens java.base/sun.nio.ch=ALL-UNNAMED --add-opens jdk.unsupported/sun.misc=ALL-UNNAMED
Agrona defines the DirectBuffer
interface for interacting with Aeron, which is somewhat similar to Java NIO ByteBuffer, but a bit more convenient.
Name | Implementation Details |
---|---|
UnsafeBuffer | A fixed size buffer outside the heap will throw an IndexOutOfBoundsException when the size is exceeded. |
ExpandableDirectByteBuffer | Scalable direct buffer, implemented in the underlying ByteBuffer, defaults to 128 bytes and can be resized via the constructor. When the size is exceeded, a new ByteBuffer is created and the existing contents are copied into it. |
ExpandableArrayBuffer | The underlying direct buffer, which uses a byte array (new byte[size]), defaults to 128 bytes, and when the size is exceeded, a new byte[] is created and the existing contents are copied in. |
5.1 Key Concepts
Agrona uses the byte order of ByteOrder.nativeOrder()
by default, and using different byte orders for reading and writing can lead to incorrect results. This can occur in cross-OS and cross-platform interactions.
The following figure shows a buffer of size 13 bytes, if we want to extract 4 bytes from it for the highlighted part, we need to set offset to 4 and then set read length to 4.
5.2 Working with Types
5.2.1 Chars & Bytes
DirectBuffer provides methods to read and write single bytes or 16-bit characters.
5.2.2 Shorts, Integers & Longs
DirectBuffer provides support for reading and writing short, int, and long data. For int and long, additional utility methods are provided for compare-and-set, get-and-add, and get-and-set.
|
|
5.2.3 Floats & Doubles
It is generally not recommended to use float and double for data transfer, but either BigDecimal formatted as a string or scaled long.
DirectBuffer provides methods for reading and writing float and double.
5.2.3 Strings
putStringAscii
,putStringUtf8
manipulate non-fixed-length strings, which is less efficient.putStringWithoutLengthAscii
,putStringWithoutLengthUtf8
manipulate fixed-length strings
VI. IdGenerator
Agrona also implements the snowflake algorithm.
- the generated result is 64-bit data
- The data is roughly ordered
- Up to 4096000 can be generated per second. If an attempt is made to generate more than this value, it will spin up to the next available generation time
- An
IllegalStateException
exception can be thrown if the system has a clock rollback - Lock-free and thread-safe
When initializing the snowflake algorithm, you need to provide a unique node ID, which supports up to 1024 nodes by default.
Note that by default it uses 1970 as the starting point, so it can only generate up to 2039 (similar to Epoch Time). It provides an overloaded constructor to specify the starting time.