消息总线
The MessageBus
is a fundamental part of the platform, enabling communication between system components
through message passing. This design creates a loosely coupled 架构 where components can interact
without direct 依赖.
The messaging patterns include:
- Point-to-Point
- Publish/Subscribe
- Request/Response
Messages exchanged via the MessageBus
fall into three categories:
- 数据
- Events
- Commands
数据 and signal publishing
While the MessageBus
is a lower-level 组件 that users typically interact with indirectly,
Actor
and Strategy
classes provide convenient methods built on top of it:
def publish_data(self, data_type: DataType, data: Data) -> None:
def publish_signal(self, name: str, value, ts_event: int | None = None) -> None:
These methods allow you to publish custom 数据 and signals efficiently without needing to work directly with the MessageBus
接口.
Direct access
For advanced users or specialized use cases, direct access to the 消息总线 is available within Actor
and Strategy
classes through the self.msgbus
参考, which provides the full 消息总线 接口.
To publish a custom message directly, you can specify a topic as a str
and any Python object
as the message payload, for example:
self.msgbus.publish("MyTopic", "MyMessage")
Messaging styles
NautilusTrader is an event-driven 框架 where components communicate by sending and receiving messages. Understanding the different messaging styles is crucial for building effective trading systems.
This 指南 explains the three primary messaging patterns available in NautilusTrader:
Messaging Style | Purpose | Best For |
---|---|---|
MessageBus - Publish/Subscribe to topics | Low-level, direct access to the 消息总线 | Custom events, system-level communication |
Actor-Based - Publish/Subscribe 数据 | Structured trading 数据 exchange | Trading metrics, indicators, 数据 needing 持久化 |
Actor-Based - Publish/Subscribe Signal | Lightweight notifications | Simple alerts, flags, status updates |
Each approach serves different purposes and offers unique advantages. This 指南 will help you decide which messaging pattern to use in your NautilusTrader applications.
MessageBus publish/subscribe to topics
Concept
The MessageBus
is the central hub for all messages in NautilusTrader. It enables a publish/subscribe pattern
where components can publish events to named topics, and other components can subscribe to receive those messages.
This decouples components, allowing them to interact indirectly via the 消息总线.
Key benefits and use cases
The 消息总线 approach is ideal when you need:
- Cross-组件 communication within the system.
- Flexibility to define any topic and send any type of payload (any Python object).
- Decoupling between publishers and subscribers who don't need to know about each other.
- Global Reach where messages can be received by multiple subscribers.
- Working with events that don't fit within the predefined
Actor
model. - Advanced scenarios requiring full control over messaging.
Considerations
- You must track topic names manually (typos could result in missed messages).
- You must define handlers manually.
Quick 概览 code
from nautilus_trader.core.message import Event
# Define a custom event
class Each10thBarEvent(Event):
TOPIC = "each_10th_bar" # Topic name
def __init__(self, bar):
self.bar = bar
# Subscribe in a component (in Strategy)
self.msgbus.subscribe(Each10thBarEvent.TOPIC, self.on_each_10th_bar)
# Publish an event (in Strategy)
event = Each10thBarEvent(bar)
self.msgbus.publish(Each10thBarEvent.TOPIC, event)
# Handler (in Strategy)
def on_each_10th_bar(self, event: Each10thBarEvent):
self.log.info(f"Received 10th bar: {event.bar}")
Full example
Actor-based publish/subscribe 数据
Concept
This approach provides a way to exchange trading specific 数据 between Actor
s in the system.
(note: each Strategy
inherits from Actor
). It inherits from Data
, which ensures proper timestamping
and ordering of events - crucial for correct backtest processing.
Key Benefits and Use Cases
The 数据 publish/subscribe approach excels when you need:
- Exchange of structured trading 数据 like market 数据, indicators, custom metrics, or option greeks.
- Proper event ordering via built-in timestamps (
ts_event
,ts_init
) crucial for backtest accuracy. - 数据 持久化 and 序列化 through the
@customdataclass
decorator, integrating seamlessly with NautilusTrader's 数据 catalog system. - Standardized trading 数据 exchange between system components.
Considerations
- Requires defining a class that inherits from
数据
or uses@customdataclass
.
Inheriting from 数据
vs. using @customdataclass
Inheriting from 数据
class:
- Defines abstract properties
ts_event
andts_init
that must be implemented by the subclass. These ensure proper 数据 ordering in backtests based on timestamps.
The @customdataclass
decorator:
- Adds
ts_event
andts_init
attributes if they are not already present. - Provides serialization functions:
to_dict()
,from_dict()
,to_bytes()
,to_arrow()
, etc. - Enables 数据 持久化 and external communication.
Quick 概览 code
from nautilus_trader.core.data import Data
from nautilus_trader.model.custom import customdataclass
@customdataclass
class GreeksData(Data):
delta: float
gamma: float
# Publish data (in Actor / Strategy)
data = GreeksData(delta=0.75, gamma=0.1, ts_event=1_630_000_000_000_000_000, ts_init=1_630_000_000_000_000_000)
self.publish_data(GreeksData, data)
# Subscribe to receiving data (in Actor / Strategy)
self.subscribe_data(GreeksData)
# Handler (this is static callback function with fixed name)
def on_data(self, data: Data):
if isinstance(data, GreeksData):
self.log.info(f"Delta: {data.delta}, Gamma: {data.gamma}")
Full example
Actor-based publish/subscribe signal
Concept
Signals are a lightweight way to publish and subscribe to simple notifications within the actor 框架. This is the simplest messaging approach, requiring no custom class definitions.
Key Benefits and Use Cases
The Signal messaging approach shines when you need:
- Simple, lightweight notifications/alerts like "RiskThresholdExceeded" or "TrendUp".
- Quick, on-the-fly messaging without defining custom classes.
- Broadcasting alerts or flags as primitive 数据 (
int
,float
, orstr
). - Easy API integration with straightforward methods (
publish_signal
,subscribe_signal
). - Multiple subscriber communication where all subscribers receive signals when published.
- Minimal 设置 overhead with no class definitions required.
Considerations
- Each signal can contain only single value of type:
int
,float
, andstr
. That means no support for complex data structures or other Python types. - In the
on_signal
handler, you can only differentiate between signals usingsignal.value
, as the signal name is not accessible in the handler.
Quick 概览 code
# Define signal constants for better organization (optional but recommended)
import types
from nautilus_trader.core.datetime import unix_nanos_to_dt
from nautilus_trader.common.enums import LogColor
signals = types.SimpleNamespace()
signals.NEW_HIGHEST_PRICE = "NewHighestPriceReached"
signals.NEW_LOWEST_PRICE = "NewLowestPriceReached"
# Subscribe to signals (in Actor/Strategy)
self.subscribe_signal(signals.NEW_HIGHEST_PRICE)
self.subscribe_signal(signals.NEW_LOWEST_PRICE)
# Publish a signal (in Actor/Strategy)
self.publish_signal(
name=signals.NEW_HIGHEST_PRICE,
value=signals.NEW_HIGHEST_PRICE, # value can be the same as name for simplicity
ts_event=bar.ts_event, # timestamp from triggering event
)
# Handler (this is static callback function with fixed name)
def on_signal(self, signal):
# IMPORTANT: We match against signal.value, not signal.name
match signal.value:
case signals.NEW_HIGHEST_PRICE:
self.log.info(
f"New highest price was reached. | "
f"Signal value: {signal.value} | "
f"Signal time: {unix_nanos_to_dt(signal.ts_event)}",
color=LogColor.GREEN
)
case signals.NEW_LOWEST_PRICE:
self.log.info(
f"New lowest price was reached. | "
f"Signal value: {signal.value} | "
f"Signal time: {unix_nanos_to_dt(signal.ts_event)}",
color=LogColor.RED
)
Full example
Summary and decision 指南
Here's a quick 参考 to help you decide which messaging style to use:
Decision guide: Which style to choose?
Use Case | Recommended Approach | 设置 required |
---|---|---|
Custom events or system-level communication | MessageBus + Pub/Sub to topic | Topic + Handler management |
Structured trading 数据 | Actor + Pub/Sub 数据 + optional @customdataclass if 序列化 is needed | New class definition inheriting from 数据 (handler on_data is predefined) |
Simple alerts/notifications | Actor + Pub/Sub Signal | Just signal name |
External publishing
The MessageBus
can be backed with any 数据库 or message broker technology which has an
integration written for it, this then enables external publishing of messages.
Redis is currently supported for all serializable messages which are published externally. The minimum supported Redis version is 6.2 (required for streams functionality).
Under the hood, when a backing 数据库 (or any other compatible technology) is configured, all outgoing messages are first serialized, then transmitted via a Multiple-Producer Single-Consumer (MPSC) channel to a separate thread (implemented in Rust). In this separate thread, the message is written to its final destination, which is presently Redis streams.
This design is primarily driven by 性能 considerations. By offloading the I/O operations to a separate thread, we ensure that the main thread remains unblocked and can continue its tasks without being hindered by the potentially time-consuming operations involved in interacting with a 数据库 or 客户端.
序列化
Nautilus supports 序列化 for:
- All Nautilus built-in types (serialized as dictionaries
dict[str, Any]
containing serializable primitives). - Python primitive types (
str
,int
,float
,bool
,bytes
).
You can add 序列化 支持 for custom types by registering them through the 序列化
subpackage.
def register_serializable_type(
cls,
to_dict: Callable[[Any], dict[str, Any]],
from_dict: Callable[[dict[str, Any]], Any],
):
...
cls
: The type to register.to_dict
: The delegate to instantiate a dict of primitive types from the object.from_dict
: The delegate to instantiate the object from a dict of primitive types.
配置
The 消息总线 external backing technology can be configured by importing the MessageBusConfig
object and passing this to
your TradingNodeConfig
. Each of these config options will be described below.
... # Other config omitted
message_bus=MessageBusConfig(
database=DatabaseConfig(),
encoding="json",
timestamps_as_iso8601=True,
buffer_interval_ms=100,
autotrim_mins=30,
use_trader_prefix=True,
use_trader_id=True,
use_instance_id=False,
streams_prefix="streams",
types_filter=[QuoteTick, TradeTick],
)
...
数据库 config
A DatabaseConfig
must be provided, for a default Redis 设置 on the local
loopback you can pass a DatabaseConfig()
, which will use defaults to match.
Encoding
Two encodings are currently supported by the built-in Serializer
used by the MessageBus
:
- JSON (
json
) - MessagePack (
msgpack
)
Use the encoding
config option to control the message writing encoding.
The msgpack
encoding is used by default as it offers the most optimal 序列化 and 内存 性能.
We recommend using json
encoding for human readability when 性能 is not a primary concern.
Timestamp formatting
By default timestamps are formatted as UNIX epoch nanosecond integers. Alternatively you can
configure ISO 8601 string formatting by setting the timestamps_as_iso8601
to True
.
Message stream keys
Message stream keys are essential for identifying individual trader nodes and organizing messages within streams. They can be tailored to meet your specific 要求 and use cases. In the context of 消息总线 streams, a trader key is typically structured as follows:
trader:{trader_id}:{instance_id}:{streams_prefix}
The following options are available for configuring message stream keys:
Trader prefix
If the key should begin with the trader
string.
Trader ID
If the key should include the trader ID for the 节点.
Instance ID
Each trader 节点 is assigned a unique 'instance ID,' which is a UUIDv4. This instance ID helps distinguish individual traders when messages
are distributed across multiple streams. You can include the instance ID in the trader key by setting the use_instance_id
配置 option to True
.
This is particularly useful when you need to track and identify traders across various streams in a multi-节点 trading system.
Streams prefix
The streams_prefix
string enables you to group all streams for a single trader instance or organize
messages for multiple instances. Configure this by passing a string to the streams_prefix
配置
option, ensuring other prefixes are set to false.
Stream per topic
Indicates whether the producer will write a separate stream for each topic. This is particularly useful for Redis backings, which do not 支持 wildcard topics when listening to streams. If set to False, all messages will be written to the same stream.
Redis does not 支持 wildcard stream topics. For better 兼容性 with Redis, it is recommended to set this option to False.
Types filtering
When messages are published on the 消息总线, they are serialized and written to a stream if a backing for the 消息总线 is configured and enabled. To prevent flooding the stream with 数据 like high-frequency quotes, you may filter out certain types of messages from external publication.
To enable this filtering mechanism, pass a list of type
objects to the types_filter
parameter in the 消息总线 配置,
specifying which types of messages should be excluded from external publication.
from nautilus_trader.config import MessageBusConfig
from nautilus_trader.data import TradeTick
from nautilus_trader.data import QuoteTick
# Create a MessageBusConfig instance with types filtering
message_bus = MessageBusConfig(
types_filter=[QuoteTick, TradeTick]
)
Stream auto-trimming
The autotrim_mins
配置 parameter allows you to specify the lookback window in minutes for automatic stream trimming in your message streams.
Automatic stream trimming helps manage the size of your message streams by removing older messages, ensuring that the streams remain manageable in terms of 存储 and 性能.
The current Redis 实现 will maintain the autotrim_mins
as a maximum width (plus roughly a minute, as streams are trimmed no more than once per minute).
Rather than a maximum lookback window based on the current wall clock time.
External streams
The 消息总线 within a TradingNode
(节点) is referred to as the "internal 消息总线".
A producer node is one which publishes messages onto an external stream (see external publishing).
The consumer 节点 listens to external streams to receive and publish deserialized message payloads on its internal 消息总线.
┌───────────────────────────┐
│ │
│ │
│ │
│ Producer Node │
│ │
│ │
│ │
│ │
│ │
│ │
└─────────────┬─────────────┘
│
│
┌───────────────────────────────▼──────────────────────────────┐
│ │
│ Stream │
│ │
└─────────────┬────────────────────────────────────┬───────────┘
│ │
│ │
┌─────────────▼───────────┐ ┌─────────────▼───────────┐
│ │ │ │
│ │ │ │
│ Consumer Node 1 │ │ Consumer Node 2 │
│ │ │ │
│ │ │ │
│ │ │ │
│ │ │ │
│ │ │ │
│ │ │ │
│ │ │ │
└─────────────────────────┘ └─────────────────────────┘
Set the LiveDataEngineConfig.external_clients
with the list of client_id
s intended to represent the external streaming clients.
The DataEngine
will filter out subscription commands for these clients, ensuring that the external streaming provides the necessary 数据 for any subscriptions to these clients.
Example 配置
The following example details a streaming 设置 where a producer 节点 publishes Binance 数据 externally, and a downstream consumer 节点 publishes these 数据 messages onto its internal 消息总线.
Producer 节点
We configure the MessageBus
of the producer 节点 to publish to a "Binance"
stream.
The settings use_trader_id
, use_trader_prefix
, and use_instance_id
are all set to False
to ensure a simple and predictable stream key that the consumer nodes can register for.
message_bus=MessageBusConfig(
database=DatabaseConfig(timeout=2),
use_trader_id=False,
use_trader_prefix=False,
use_instance_id=False,
streams_prefix="binance", # <---
stream_per_topic=False,
autotrim_mins=30,
),
Consumer 节点
We configure the MessageBus
of the consumer 节点 to receive messages from the same "Binance"
stream.
The 节点 will listen to the external stream keys to publish these messages onto its internal 消息总线.
Additionally, we declare the 客户端 ID "BINANCE_EXT"
as an external 客户端. This ensures that the
DataEngine
does not attempt to send 数据 commands to this 客户端 ID, as we expect these messages to be
published onto the internal 消息总线 from the external stream, to which the 节点 has subscribed to the relevant topics.
data_engine=LiveDataEngineConfig(
external_clients=[ClientId("BINANCE_EXT")],
),
message_bus=MessageBusConfig(
database=DatabaseConfig(timeout=2),
external_streams=["binance"], # <---
),