Aggregate Metrics with M3 Aggregator
M3 Aggregator is a dedicated metrics aggregator that provides stateful stream-based downsampling before storing metrics in M3DB nodes. It uses dynamic rules stored in etcd .
It uses leader election and aggregation window tracking, leveraging etcd to manage this state, to reliably emit at-least-once aggregations for downsampled metrics to long term storage. This provides cost effective and reliable downsampling & roll up of metrics.
M3 Coordinator can also perform this role, but M3 Aggregator shards and replicates metrics, whereas M3 Coordinator is not and requires care to deploy and run in a highly available way.
Similar to M3DB, M3 Aggregator supports clustering and replication by default. This means that metrics are correctly routed to the instance(s) responsible for aggregating each metric and you can configure multiple M3 Aggregator replicas so that there are no single points of failure for aggregation.
M3 Aggregator is in heavy development to make it more usable without requiring you to write your own compatible producer and consumer.
Configuration
Before setting up m3aggregator, make sure that you have at least one M3DB node running and a dedicated m3coordinator setup.
We highly recommend running with at least a replication factor 2 for a m3aggregator
deployment. If you run with replication factor 1 then when you restart an aggregator it will temporarily interrupt good the stream of aggregated metrics and there will be some data loss.
Topology
Initializing aggregator topology
You can setup a m3aggregator topology by issuing a request to your coordinator (be sure to use your own hostnames, number of shards and replication factor):
curl -vvvsSf -H "Cluster-Environment-Name: namespace/m3db-cluster-name" -X POST http://m3dbnode-with-embedded-coordinator:7201/api/v1/services/m3aggregator/placement/init -d '{
"num_shards": 64,
"replication_factor": 2,
"instances": [
{
"id": "m3aggregator01:6000",
"isolation_group": "availability-zone-a",
"zone": "embedded",
"weight": 100,
"endpoint": "m3aggregator01:6000",
"hostname": "m3aggregator01",
"port": 6000
},
{
"id": "m3aggregator02:6000",
"isolation_group": "availability-zone-b",
"zone": "embedded",
"weight": 100,
"endpoint": "m3aggregator02:6000",
"hostname": "m3aggregator02",
"port": 6000
}
]
}'
Initializing m3msg topic for m3aggregator to receive from m3coordinators to aggregate metrics
Now we must setup a topic for the m3aggregator
to receive unaggregated metrics from m3coordinator
instances:
curl -vvvsSf -H "Cluster-Environment-Name: namespace/m3db-cluster-name" -H "Topic-Name: aggregator_ingest" -X POST http://m3dbnode-with-embedded-coordinator:7201/api/v1/topic/init -d '{
"numberOfShards": 64
}'
Add m3aggregator consumer group to ingest topic
Add the m3aggregator
placement to receive traffic from the topic (make sure to set message TTL to match your desired maximum in memory retry message buffer):
curl -vvvsSf -H "Cluster-Environment-Name: namespace/m3db-cluster-name" -H "Topic-Name: aggregator_ingest" -X POST http://m3dbnode-with-embedded-coordinator:7201/api/v1/topic -d '{
"consumerService": {
"serviceId": {
"name": "m3aggregator",
"environment": "namespace/m3db-cluster-name",
"zone": "embedded"
},
"consumptionType": "REPLICATED",
"messageTtlNanos": "300000000000"
}
}'
Note: 300000000000 nanoseconds is a TTL of 5 minutes for messages to rebuffer for retry.
Initializing m3msg topic for m3coordinator to receive from m3aggregator to write to M3DB
Now we must setup a topic for the m3coordinator
to receive aggregated metrics from m3aggregator
instances to write to M3DB:
curl -vvvsSf -H "Cluster-Environment-Name: namespace/m3db-cluster-name" -H "Topic-Name: aggregated_metrics" -X POST http://m3dbnode-with-embedded-coordinator:7201/api/v1/topic/init -d '{
"numberOfShards": 64
}'
Initializing m3coordinator topology
Then m3coordinator
instances need to be configured to receive traffic for this topic (note ingest at port 7507 must match the configured port for your m3coordinator
ingest server, see config at bottom of this guide):
curl -vvvsSf -H "Cluster-Environment-Name: namespace/m3db-cluster-name" -X POST http://m3dbnode-with-embedded-coordinator:7201/api/v1/services/m3coordinator/placement/init -d '{
"instances": [
{
"id": "m3coordinator01",
"zone": "embedded",
"endpoint": "m3coordinator01:7507",
"hostname": "m3coordinator01",
"port": 7507
}
]
}'
Note: When you add or remove m3coordinator
instances they must be added to this placement.
Add m3coordinator consumer group to outbound topic
Add the m3coordinator
placement to receive traffic from the topic (make sure to set message TTL to match your desired maximum in memory retry message buffer):
curl -vvvsSf -H "Cluster-Environment-Name: namespace/m3db-cluster-name" -H "Topic-Name: aggregated_metrics" -X POST http://m3dbnode-with-embedded-coordinator:7201/api/v1/topic -d '{
"consumerService": {
"serviceId": {
"name": "m3coordinator",
"environment": "namespace/m3db-cluster-name",
"zone": "embedded"
},
"consumptionType": "SHARED",
"messageTtlNanos": "300000000000"
}
}'
Note: 300000000000 nanoseconds is a TTL of 5 minutes for messages to rebuffer for retry.
Running
Dedicated Coordinator
Metrics will still arrive at the m3coordinator
, they simply need to be forwarded to an m3aggregator
. The m3coordinator
then also needs to receive metrics that have been aggregated from the m3aggregator
and store them in M3DB, so running an ingestion server should be configured.
Here is the config you should add to your m3coordinator
:
# This is for sending metrics to the remote m3aggregators
downsample:
remoteAggregator:
client:
type: m3msg
m3msg:
producer:
writer:
topicName: aggregator_ingest
topicServiceOverride:
zone: embedded
environment: namespace/m3db-cluster-name
placement:
isStaged: true
placementServiceOverride:
namespaces:
placement: /placement
connection:
numConnections: 4
messagePool:
size: 16384
watermark:
low: 0.2
high: 0.5
# This is for configuring the ingestion server that will receive metrics from the m3aggregators on port 7507
ingest:
ingester:
workerPoolSize: 10000
opPool:
size: 10000
retry:
maxRetries: 3
jitter: true
logSampleRate: 0.01
m3msg:
server:
listenAddress: "0.0.0.0:7507"
retry:
maxBackoff: 10s
jitter: true
M3 Aggregator
You can run m3aggregator
by either building and running the binary yourself:
make m3aggregator
./bin/m3aggregator -f ./src/aggregator/config/m3aggregator.yml
Or you can run it with Docker using the Docker file located at docker/m3aggregator/Dockerfile
or the publicly provided image quay.io/m3db/m3aggregator:latest
.
You can use a config like so, making note of the topics used such as aggregator_ingest
and aggregated_metrics
and the corresponding environment namespace/m3db-cluster-name
:
logging:
level: info
metrics:
scope:
prefix: m3aggregator
prometheus:
onError: none
handlerPath: /metrics
listenAddress: 0.0.0.0:6002
timerType: histogram
sanitization: prometheus
samplingRate: 1.0
extended: none
m3msg:
server:
listenAddress: 0.0.0.0:6000
retry:
maxBackoff: 10s
jitter: true
consumer:
messagePool:
size: 16384
watermark:
low: 0.2
high: 0.5
http:
listenAddress: 0.0.0.0:6001
readTimeout: 60s
writeTimeout: 60s
kvClient:
etcd:
env: namespace/m3db-cluster-name
zone: embedded
service: m3aggregator
cacheDir: /var/lib/m3kv
etcdClusters:
- zone: embedded
endpoints:
- dbnode01:2379
runtimeOptions:
kvConfig:
environment: namespace/m3db-cluster-name
zone: embedded
writeValuesPerMetricLimitPerSecondKey: write-values-per-metric-limit-per-second
writeValuesPerMetricLimitPerSecond: 0
writeNewMetricLimitClusterPerSecondKey: write-new-metric-limit-cluster-per-second
writeNewMetricLimitClusterPerSecond: 0
writeNewMetricNoLimitWarmupDuration: 0
aggregator:
hostID:
resolver: environment
envVarName: M3AGGREGATOR_HOST_ID
instanceID:
type: host_id
verboseErrors: true
metricPrefix: ""
counterPrefix: ""
timerPrefix: ""
gaugePrefix: ""
aggregationTypes:
counterTransformFnType: empty
timerTransformFnType: suffix
gaugeTransformFnType: empty
aggregationTypesPool:
size: 1024
quantilesPool:
buckets:
- count: 256
capacity: 4
- count: 128
capacity: 8
stream:
eps: 0.001
capacity: 32
streamPool:
size: 4096
samplePool:
size: 4096
floatsPool:
buckets:
- count: 4096
capacity: 16
- count: 2048
capacity: 32
- count: 1024
capacity: 64
client:
type: m3msg
m3msg:
producer:
writer:
topicName: aggregator_ingest
topicServiceOverride:
zone: embedded
environment: namespace/m3db-cluster-name
placement:
isStaged: true
placementServiceOverride:
namespaces:
placement: /placement
messagePool:
size: 16384
watermark:
low: 0.2
high: 0.5
placementManager:
kvConfig:
namespace: /placement
environment: namespace/m3db-cluster-name
zone: embedded
placementWatcher:
key: m3aggregator
initWatchTimeout: 10s
hashType: murmur32
bufferDurationBeforeShardCutover: 10m
bufferDurationAfterShardCutoff: 10m
bufferDurationForFutureTimedMetric: 10m # Allow test to write into future.
resignTimeout: 1m
flushTimesManager:
kvConfig:
environment: namespace/m3db-cluster-name
zone: embedded
flushTimesKeyFmt: shardset/%d/flush
flushTimesPersistRetrier:
initialBackoff: 100ms
backoffFactor: 2.0
maxBackoff: 2s
maxRetries: 3
electionManager:
election:
leaderTimeout: 10s
resignTimeout: 10s
ttlSeconds: 10
serviceID:
name: m3aggregator
environment: namespace/m3db-cluster-name
zone: embedded
electionKeyFmt: shardset/%d/lock
campaignRetrier:
initialBackoff: 100ms
backoffFactor: 2.0
maxBackoff: 2s
forever: true
jitter: true
changeRetrier:
initialBackoff: 100ms
backoffFactor: 2.0
maxBackoff: 5s
forever: true
jitter: true
resignRetrier:
initialBackoff: 100ms
backoffFactor: 2.0
maxBackoff: 5s
forever: true
jitter: true
campaignStateCheckInterval: 1s
shardCutoffCheckOffset: 30s
flushManager:
checkEvery: 1s
jitterEnabled: true
maxJitters:
- flushInterval: 5s
maxJitterPercent: 1.0
- flushInterval: 10s
maxJitterPercent: 0.5
- flushInterval: 1m
maxJitterPercent: 0.5
- flushInterval: 10m
maxJitterPercent: 0.5
- flushInterval: 1h
maxJitterPercent: 0.25
numWorkersPerCPU: 0.5
flushTimesPersistEvery: 10s
maxBufferSize: 5m
forcedFlushWindowSize: 10s
flush:
handlers:
- dynamicBackend:
name: m3msg
hashType: murmur32
producer:
writer:
topicName: aggregated_metrics
topicServiceOverride:
zone: embedded
environment: namespace/m3db-cluster-name
messagePool:
size: 16384
watermark:
low: 0.2
high: 0.5
passthrough:
enabled: true
forwarding:
maxConstDelay: 5m # Need to add some buffer window, since timed metrics by default are delayed by 1min.
entryTTL: 1h
entryCheckInterval: 10m
maxTimerBatchSizePerWrite: 140
defaultStoragePolicies: []
maxNumCachedSourceSets: 2
discardNaNAggregatedValues: true
entryPool:
size: 4096
counterElemPool:
size: 4096
timerElemPool:
size: 4096
gaugeElemPool:
size: 4096
Usage
Send metrics as usual to your m3coordinator
instances in round robin fashion (or any other load balancing strategy), the metrics will be forwarded to the m3aggregator
instances, then once aggregated they will be returned to the m3coordinator
instances to write to M3DB.