Skip to main content
On this page

Apply Traffic Control Policies

Overview

Conduktor Gateway offers a number of Interceptors that apply Traffic Control Policies to data.

Alter broker config

The alter broker config policy Interceptor will impose limits on configuration changes to ensure that any configuration changed in the cluster adhere to the configured specification.

The full list of Kafka configurations that this Interceptor protects is:

  • log.retention.bytes
  • log.retention.ms
  • log.segment.bytes

What happens when sending an invalid request

Any request that doesn't match the Interceptor's configuration will be blocked and return the corresponding error message.

For example: you want to change the configuration log.retention.ms = 10000, but the Interceptor is being configured minLogRetentionMs=60000. When you send that request to the cluster, the following error is returned:

org.apache.kafka.common.errors.PolicyViolationException: Request parameters do not satisfy the configured policy. log.retention.ms is '1', must not be less than '10'

Configuration

The configuration table now includes the updated structure for the configuration values.

KeyTypeDescription
blacklistBlackListBlacklist of properties which cannot be changed
logRetentionBytesLongConfiguration for log.retention.bytes
logRetentionMsLongConfiguration for log.retention.ms
logSegmentBytesLongConfiguration for log.segment.bytes

BlackList

KeyTypeDescription
valuesSet StringA set of string that contains properties that cannot be changed
actionActionAction to take if the value is outside the specified range.

Long

KeyTypeDescription
mindoubleMinimum value for the configuration.
maxdoubleMaximum value for the configuration.
actionactionAction to take if the value is outside the specified range.
overrideValuedoubleValue to override with (only applicable when action is set to OVERRIDE).

Action

  • BLOCK - when fail, save in audit and return error.
  • INFO - execute API with wrong value, save in audit.
  • OVERRIDE - execute API with overrideValue values, save in audit the fact that we updated on the fly (with wrong value, and the one we used to fix them).

Example

{
"name": "myAlterBrokerConfigPolicy",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.AlterBrokerConfigPolicyPlugin",
"priority": 100,
"config": {
"logRetentionBytes": {
"min": 10,
"max": 100,
"action": "BLOCK"
},
"logRetentionMs": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"logSegmentBytes": {
"min": 10,
"max": 100,
"action": "INFO"
}
}
}

Alter topic config

The alter topic config policy Interceptor will impose limits on configuration changes to ensure that any configuration changed in the topic adhere to the configured specification.

The full list of Kafka configurations that this Interceptor protects is:

  • retention.ms
  • retention.bytes
  • segment.ms
  • segment.bytes
  • segment.jitter.ms
  • flush.messages
  • flush.ms
  • max.message.bytes
  • min.insync.replicas
  • cleanup.policy
  • unclean.leader.election.enable

Sending an invalid request:

Any request that doesn't match the Interceptor's configuration will be blocked and return the corresponding error message.

For example, you want to change the configuration retention.ms = 10000 but the Interceptor is being configured minRetentionMs=60000. When you send that request to the cluster, the following error is returned:

org.apache.kafka.common.errors.PolicyViolationException: 
Request parameters do not satisfy the configured policy. retention.ms is '1', must not be less than '10'

Configuration

KeyTypeDefaultDescription
topicString.*Topics that match this regex will have the Interceptor applied
blacklistBlackListBlacklist of properties which cannot be changed
retentionMsLongConfiguration for retention.ms
retentionBytesLongConfiguration for retention.bytes
segmentMsLongConfiguration for segment.ms
segmentBytesIntegerConfiguration for segment.bytes
segmentJitterMsLongConfiguration for segment.jitter.ms
flushMessagesLongConfiguration for flush.messages
flushMsLongConfiguration for flush.ms
maxMessageBytesIntegerConfiguration for max.message.bytes
minInsyncReplicasIntegerConfiguration for min.insync.replicas
cleanupPolicyCleanupolicyConfiguration for cleanup.policy
uncleanLeaderElectionEnableBooleanConfiguration for unclean.leader.election.enable

BlackList

KeyTypeDescription
valuesSet StringA set of string that contains properties that cannot be changed
actionActionAction to take if the value is outside the specified range.

Integer

KeyTypeDescription
minintMinimum value for the configuration.
maxintMaximum value for the configuration.
actionactionAction to take if the value is outside the specified range.
overrideValueintValue to override with (only applicable when action is set to OVERRIDE).

Long

KeyTypeDescription
mindoubleMinimum value for the configuration.
maxdoubleMaximum value for the configuration.
actionactionAction to take if the value is outside the specified range.
overrideValuedoubleValue to override with (only applicable when action is set to OVERRIDE).

Cleanup policy

KeyTypeDescription
valuesSet StringValue for the configuration, should be a set of string that contains values from delete, compact or specify both policies in a comma-separated list (eg: delete,compact).
actionActionAction to take if the value is outside the specified range.
overrideValueStringValue to override with (only applicable when action is set to OVERRIDE).

Boolean

KeyTypeDescription
valueBooleanValue for the configuration. If action is OVERRIDE, will use this value for override value
actionActionAction to take if the value is outside the specified range.

Action

  • BLOCK - when fail, save in audit and return error.
  • INFO - execute API with wrong value, save in audit.
  • OVERRIDE - execute API with overrideValue (or value for others) values, save in audit the fact that we updated on the fly (with wrong value, and the one we used to fix them)

Example

{
"name": "myAlterTopicConfigPolicy",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.AlterTopicConfigPolicyPlugin",
"priority": 100,
"config": {
"topic": ".*",
"retentionMs": {
"min": 10,
"max": 100
},
"retentionBytes": {
"min": 10,
"max": 100,
"action": "BLOCK"
},
"segmentMs": {
"min": 10,
"max": 100,
"action": "INFO"
},
"segmentBytes": {
"min": 10,
"max": 100,
"action": "BLOCK"
},
"segmentJitterMs": {
"min": 10,
"max": 100,
"action": "INFO",
"overrideValue": 20
},
"flushMessages": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"flushMs": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"maxMessageBytes": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"minInsyncReplicas": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"cleanupPolicy": {
"value": [
"delete",
"compact"
],
"action": "OVERRIDE"
},
"uncleanLeaderElectionEnable": {
"value": false,
"action": "BLOCK"
}
}
}

ClientId required

If client id does not match the specified name convention, it will respond PolicyViolationException when action is BLOCK. Otherwise, fill the client-id with a templating mechanism

We support templating such as clientId-{{userIp}}-testing". Here are the values we can expand:

  • uuid
  • userIp
  • vcluster
  • user
  • clientId
  • gatewayIp
  • gatewayHost
  • gatewayVersion
  • apiKey
  • apiKeyVersion
  • timestampMillis

Configuration

KeyTypeDefaultDescription
clientIdTemplateStringClient-id with a templating mechanism to override
namingConventionString.*Configuration for validating client id name convention
actionActionAction to take if the client id is invalid

Action

  • BLOCK - when fail, save in audit and return error.
  • INFO - execute API with wrong client id, save in audit.
  • OVERRIDE - execute API with override value with a templating mechanism, save in audit the fact that we updated on the fly.

Example

{
"name": "client-id-required-policy",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.ClientIdRequiredPolicyPlugin",
"priority": 100,
"config": {
"namingConvention": "clientId-.*",
"action": "BLOCK"
}
}
{
"name": "client-id-required-policy",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.ClientIdRequiredPolicyPlugin",
"priority": 100,
"config": {
"namingConvention": "clientId-.*",
"action": "INFO"
}
}
{
"name": "client-id-required-policy",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.ClientIdRequiredPolicyPlugin",
"priority": 100,
"config": {
"clientIdTemplate": "clientId-{{userIp}}-testing",
"namingConvention": "clientId-.*",
"action": "OVERRIDE"
}
}

Consumer group policy

The consumer group policy Interceptor is designed to enhance the reliability and efficiency of Kafka consumer group operations.

By enforcing specific configuration policies, it ensures that consumer groups adhere to predefined rules, thereby preventing potential issues.

Sending an invalid request

For example: you configure consumer with groupId is invalid_group_id, but the Interceptor is being configured groupId=conduktor_group_id.*.

Block request

Any request that doesn't match the Interceptor's configuration will be blocked and return the corresponding error message. When a consumer sends that configuration to the cluster, the following error is returned:

org.apache.kafka.common.errors.PolicyViolationException: 
Request parameters do not satisfy the configured policy. GroupId 'invalid_group_id' is invalid.`

Info on request

invalid_group_id is still accepted and you will receive an audit record with the following error: Request parameters do not satisfy the configured policy. GroupId 'invalid_group_id' is invalid.

Configuration

KeyTypeDescription
groupIdRegexConfiguration for groupId.
sessionTimeoutMsIntegerConfiguration for session timeout.
rebalanceTimeoutMsIntegerConfiguration for rebalance timeout.
memberIdRegexConfiguration for memberId.
groupInstanceIdRegexConfiguration for groupInstanceId.

Regex

KeyTypeDefaultDescription
valueStringValue as a regex, request values matching this regex will have Interceptor applied.
actionActionBLOCKAction to take if the value is outside the specified range.

Integer

KeyTypeDefaultDescription
minintMinimum value for the configuration.
maxintMaximum value for the configuration.
actionactionBLOCKAction to take if the value is outside the specified range.
overrideValueintValue to override with (only applicable when action is set to OVERRIDE).

Action

  • BLOCK - when fail, save in audit and return error.
  • INFO - execute API with wrong value, save in audit.
  • OVERRIDE - execute API with overrideValue values, save in audit the fact that we updated on the fly (with wrong value, and the one we used to fix them).

Example

{
"name": "myConsumerGroupPolicyPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.ConsumerGroupPolicyPlugin",
"priority": 100,
"config": {
"groupId": {
"value": "group.*",
"action": "BLOCK"
},
"sessionTimeoutMs": {
"max": 60000,
"action": "INFO"
},
"rebalanceTimeoutMs": {
"min": 30000,
"action": "OVERRIDE",
"overrideValue": 40000
},
"memberId": {
"value": "member.*",
"action": "INFO"
},
"groupInstanceId": {
"value": "groupInstance.*",
"action": "BLOCK"
}
}
}

Create topic policy

Kafka is allowing the creation of topics freely, which leads to invalid topics being created in the cluster. Create topic policy limits on topic creation to ensure that any topics created in the cluster adhere to a minimum/maximum specification for Replication Factor and Partition count, as well as topic-level configs.

Configuration

KeyTypeDefaultDescription
topicString.*Topics that match this regex will have the Interceptor applied
namingConventionRegexConfiguration for validating topic name convention
numPartitionIntegerConfiguration for number of partitions
replicationFactorIntegerConfiguration for number of replicas
cleanupPolicyCleanup policyConfiguration for cleanup.policy
compressionTypeCompression typeConfiguration for compression.type
deleteRetentionMsLongConfiguration for delete.retention.ms
fileDeleteDelayMsLongConfiguration for file.delete.delay.ms
flushMessagesLongConfiguration for flush.messages
flushMsLongConfiguration for flush.ms
indexIntervalBytesIntegerConfiguration for index.interval.bytes
maxCompactionLagMsLongConfiguration for max.compaction.lag.ms
maxMessageBytesIntegerConfiguration for max.message.bytes
messageTimestampDifferenceMaxMsLongConfiguration for message.timestamp.difference.max.ms
messageTimestampTypeMessage timestamp typeConfiguration for message.timestamp.type
minCleanableDirtyRatioDoubleConfiguration for min.cleanable.dirty.ratio
minCompactionLagMsLongConfiguration for min.compaction.lag.ms
minInsyncReplicasIntegerConfiguration for min.insync.replicas
preallocateBooleanConfiguration for preallocate
retentionBytesLongConfiguration for retention.bytes
retentionMsLongConfiguration for retention.ms
segmentBytesIntegerConfiguration for segment.bytes
segmentIndexBytesIntegerConfiguration for segment.bytes
segmentJitterMsLongConfiguration for segment.jitter.ms
segmentMsLongConfiguration for segment.ms
uncleanLeaderElectionEnableBooleanConfiguration for unclean.leader.election.enable
messageDownconversionEnableBooleanConfiguration for message.downconversion.enable

Regex

KeyTypeDefaultDescription
valueStringRegex for validating topic name
actionActionBLOCKAction to take if the value is outside the specified range.

Integer

KeyTypeDefaultDescription
minintMinimum value for the configuration.
maxintMaximum value for the configuration.
actionactionBLOCKAction to take if the value is outside the specified range.
overrideValueintValue to override with (only applicable when action is set to OVERRIDE).

Long

KeyTypeDefaultDescription
mindoubleMinimum value for the configuration.
maxdoubleMaximum value for the configuration.
actionactionBLOCKAction to take if the value is outside the specified range.
overrideValuedoubleValue to override with (only applicable when action is set to OVERRIDE).

Cleanup policy

KeyTypeDefaultDescription
valuesSet StringValue for the configuration, should be a set of string that contains values from delete, compact or specify both policies in a comma-separated list like delete,compact.
actionActionBLOCKAction to take if the value is outside the specified range.
overrideValueStringValue to override with (only applicable when action is set to OVERRIDE).

Compression type

KeyTypeDefaultDescription
valuesSet CompressionSet of string contains compression types.
actionActionBLOCKAction to take if the value is outside the specified range.
overrideValueCompressionValue to override with (only applicable when action is set to OVERRIDE).

Message timestamp type

KeyTypeDefaultDescription
valueStringOnly these are allowed, allowed values: CreateTime or LogAppendTime. If action is OVERRIDE, will use this value for override value
actionActionBLOCKAction to take if the value is outside the specified range.

Boolean

KeyTypeDefaultDescription
valueBooleanValue for the configuration. If action is OVERRIDE, will use this value for override value
actionActionBLOCKAction to take if the value is outside the specified range.

Compression

  • uncompressed
  • gzip
  • snappy
  • lz4
  • zstd
  • producer

Action

  • BLOCK - when fail, save in audit and return error.
  • INFO - execute API with wrong value, save in audit.
  • OVERRIDE - execute API with overrideValue (or value for others) values, save in audit the fact that we updated on the fly (with wrong value, and the one we used to fix them)

Example

{
"name": "myCreateTopicPolicyPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.CreateTopicPolicyPlugin",
"priority": 100,
"config": {
"topic": "topic_1.*",
"numPartition": {
"min": 5,
"max": 5,
"action": "BLOCK"
},
"replicationFactor": {
"min": 2,
"max": 4,
"action": "OVERRIDE",
"overrideValue": 3
},
"retentionMs": {
"min": 10,
"max": 100
},
"retentionBytes": {
"min": 10,
"max": 100,
"action": "BLOCK"
},
"segmentMs": {
"min": 10,
"max": 100,
"action": "INFO"
},
"segmentBytes": {
"min": 10,
"max": 100,
"action": "BLOCK"
},
"segmentJitterMs": {
"min": 10,
"max": 100,
"action": "INFO",
"overrideValue": 20
},
"flushMessages": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"flushMs": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"maxMessageBytes": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"minInsyncReplicas": {
"min": 10,
"max": 100,
"action": "OVERRIDE",
"overrideValue": 20
},
"cleanupPolicy": {
"values": [
"delete",
"compact"
],
"action": "OVERRIDE"
},
"uncleanLeaderElectionEnable": {
"value": false,
"action": "BLOCK"
},
"compressionType": {
"values": [
"producer",
"gzip"
],
"action": "BLOCK"
}
}
}

Fetch policy

The fetch policy interceptor will be able to encourage (log) or block fetch requests that do not meet the specified configuration.

Sending an invalid request

Any request that doesn't match the Interceptor's configuration will be blocked and return the corresponding error message. For example: you want to send fetch request with isolationLevel=read_committed, but the Interceptor is being configured isolationLevel=read_uncommitted.

When you send that request to the cluster, consumer will retry the request and the following error is logged in the gateway:

org.apache.kafka.common.errors.PolicyViolationException: 
Request parameters do not satisfy the configured policy. Topic 'topicName' with isolationLevel is READ_UNCOMMITTED, must be READ_COMMITTED

Configuration

KeyTypeDefaultDescription
topicString.*Topics that match this regex will have the Interceptor applied. If no value is set, it will be applied to all topics.
isolationLevelIsolationLevelConfiguration for isolation level
rackIdRequiredBooleanConfiguration of rankId usage
fetchMaxBytesSafeguardIntegerConfigConfiguration for maxBytes
fetchMinBytesSafeguardIntegerConfigConfiguration for minBytes
maxWaitMsSafeguardIntegerConfigConfiguration for maxWaitMs
versionVersionConfiguration for fetch version

Isolation Level

KeyTypeDefaultDescription
valueIsolationIsolation level for fetch request
actionActionBLOCKAction to take if the value is outside the specified range.

Boolean

KeyTypeDefaultDescription
valueBooleanValue for the configuration
actionActionBLOCKAction to take if the value is outside the specified range.

Version

KeyTypeDefaultDescription
minintMinimum value of fetch version
maxintMaximum value of fetch version
actionactionBLOCKAction to take if the value is outside the specified range.

SafeguardIntegerConfig

KeyTypeDefaultDescription
minintMinimum value of property
maxintMaximum value of property
actionactionBLOCKAction to take if the value is outside the specified range.

Isolation

  • read_uncommitted
  • read_committed

Action

  • BLOCK - when fail, save in audit and return error.
  • INFO - execute API with wrong value, save in audit.

Example

{
"name": "myFetchPolicyPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.FetchPolicyPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"isolationLevel": {
"value": "read_uncommitted",
"action": "BLOCK"
},
"rackIdRequired": {
"value": true,
"action": "BLOCK"
},
"fetchMaxBytes": {
"min": 1000,
"max": 3000,
"action": "INFO"
},
"fetchMinBytes": {
"min": 1,
"max": 500,
"action": "INFO"
},
"maxWaitMs": {
"min": 10000,
"max": 20000,
"action": "INFO"
},
"version": {
"min": 1,
"max": 3,
"action": "BLOCK"
}
}
}

Limit commit offset policy

Limit Commit Offset Policy limits commit offset attempts on the same groupId within a minute. If commit offset attempts hit more than limitation in specific duration, it will respond PolicyViolationException.

Configuration

KeyTypeDefaultDescription
groupIdstring.*groupId regex, groupId that match this regex will have the Interceptor applied.
maximumCommitsPerMinuteintMaximum commit offset attempts on the same groupId within a minute
actionactionBLOCKAction to take if the value is outside the specified range.
throttleTimeMsint100Value to throttle with (only applicable when action is set to THROTTLE).

Action

  • BLOCK - when fail, save in audit and return error.
  • INFO - execute API with wrong value, save in audit.
  • THROTTLE - when fail, save in audit and the request will be throttled with time = throttleTimeMs.

Example

{
"name": "limit-commit-offset-policy",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.LimitCommitOffsetPolicyPlugin",
"priority": 100,
"config": {
"groupId": "myGroupId.*",
"maximumCommitsPerMinute": 5,
"action": "BLOCK"
}
}

Limit connection attempts policy

Limit connection policy limits connection attempts within a second because creating a new connection is expensive. If connection attempts hit more than limitation in specific duration, it will respond PolicyViolationException.

Configuration

KeyTypeDefaultDescription
maximumConnectionsPerSecondintMaximum connections which is allowed within a second
actionactionBLOCKAction to take if the value is outside the specified range.
throttleTimeMsint100Value to throttle with (only applicable when action is set to THROTTLE).

Action

  • BLOCK - when fail, save in audit and return error.
  • INFO - execute API with wrong value, save in audit.
  • THROTTLE - when fail, save in audit and the request will be throttled with time = throttleTimeMs.

Example

{
"name": "myLimitConnectionPolicyPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.LimitConnectionPolicyPlugin",
"priority": 100,
"config": {
"maximumConnectionsPerSecond": 5,
"action": "BLOCK"
}
}

Limit join group policy

Limit join group policy limits joinGroup attempts on the same groupId within a minute. If joinGroups attempts hit more than limitation in specific duration, it will respond PolicyViolationException.

Configuration

KeyTypeDefaultDescription
groupIdstring.*groupId regex, groupId that match this regex will have the Interceptor applied
maximumJoinsPerMinuteintMaximum joinGroup attempts on the same groupId within a minute.
actionactionAction to take if the value is outside the specified range.
throttleTimeMsint100Value to throttle with (only applicable when action is set to THROTTLE).

Action

  • BLOCK - when fail, save in audit and return error.
  • INFO - execute API with wrong value, save in audit.
  • THROTTLE - when fail, save in audit and the request will be throttled with time = throttleTimeMs.

Example

{
"name": "limit-join-group-policy",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.LimitJoinGroupPolicyPlugin",
"priority": 100,
"config": {
"groupId": "myGroupId.*",
"maximumJoinsPerMinute": 5,
"action": "BLOCK"
}
}

Message header removal policy

This Interceptor cleanup by removing unnecessary record headers when consume message. This supports 'Fetch Response' only. This should be run in the end of Interceptor list.

Configuration

KeyTypeDefaultDescription
topicString.*Topics that match this regex will have the Interceptor applied
headerKeyRegexStringRecord header key regex, record header with key matches this regex will be removed

Example

{
"name": "myMessageHeaderRemovalInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.MessageHeaderRemovalPlugin",
"priority": 100,
"config": {
"topic": "topic-.*",
"headerKeyRegex": "headerKey.*"
}
}

Produce policy

The produce policy Interceptor will impose limits on incoming messages to kafka to ensure that messages going to kafka adhere to the configured specification.

Sending an invalid request

Any request that doesn't match the Interceptor's configuration will be blocked and return the corresponding error message. For example: you want to send record without header, but the Interceptor is being configured recordHeaderRequired=true. When you send that request to the cluster, the following error is returned:

org.apache.kafka.common.errors.PolicyViolationException: 
Request parameters do not satisfy the configured policy. Headers are required

Configuration

KeyTypeDefaultDescription
topicString.*Topics that match this regex will have the Interceptor applied. If no value is set, it will be applied to all topics.
acksAcksConfiguration for acks modes
recordHeaderRequiredBooleanConfiguration of header usage
compressionsCompression typeConfiguration for compression types
idempotenceRequiredBooleanConfiguration for idempotency usage
transactionRequiredBooleanConfiguration for transaction usage
versionVersionConfiguration for produce version

Acks

KeyTypeDefaultDescription
valueArray integerOnly these acks modes are allowed, allowed values: -1, 0, 1
actionActionBLOCKAction to take if the value is outside the specified range.

Boolean

KeyTypeDefaultDescription
valueBooleanValue for the configuration. If action is OVERRIDE, will use this value for override value
actionActionBLOCKAction to take if the value is outside the specified range.

Version

KeyTypeDefaultDescription
minintMinimum value of produce version
maxintMaximum value of produce version
actionactionBLOCKAction to take if the value is outside the specified range.

Compression Type

KeyTypeDefaultDescription
valuesSet CompressionSet of string contains compression types.
actionActionBLOCKAction to take if the value is outside the specified range. `
overrideValueCompressionValue to override with (only applicable when action is set to OVERRIDE).

Compression

  • NONE
  • GZIP
  • SNAPPY
  • LZ4
  • ZSTD

Action

  • BLOCK - when fail, save in audit and return error.
  • INFO - execute API with wrong value, save in audit.

Example

{
"name": "myProducePolicyPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.ProducePolicyPlugin",
"priority": 100,
"config": {
"topic": "topic.*",
"acks": {
"value": [
-1,
0
],
"action": "BLOCK"
},
"recordHeaderRequired": {
"value": true,
"action": "BLOCK"
},
"compressions": {
"value": [
"NONE",
"GZIP"
],
"action": "INFO"
},
"idempotenceRequired": {
"value": true,
"action": "INFO"
},
"transactionRequired": {
"value": true
},
"version": {
"min": 1,
"max": 3,
"action": "BLOCK"
}
}
}

Producer rate limiting policy

Kafka uses per broker quotas to throttle the volume of data reaching each broker. Throttling across the cluster is not possible using default Apache Kafka.

Additionally, if you are using a hosted Kafka instance you don't have access to the Kafka configuration to set quotas.

This Interceptor improves the throttling story by limiting throughput at a per Gateway scope, throttling produce throughput on either a global or per vcluster(tenant) basis.

Configuration

KeyTypeDefaultDescription
maximumBytesPerSecondintMaximum bytes which is allowed to produce within a second
actionBLOCKactionAction to take if the value is outside the specified range

Action

  • BLOCK - when threshold is reached, throttle and save an error in audit.
  • INFO - when threshold is reached, do not throttle but save in audit a warn.

Example

{
"name": "myProducerRateLimitingPolicyPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.ProducerRateLimitingPolicyPlugin",
"priority": 100,
"config": {
"maximumBytesPerSecond": 500,
"action": "BLOCK"
}
}

The maximum number of bytes that can be produced in any one second, before being throttled. In the above example only 500 bytes are allowed to be produced per second, before being throttled.

Read-only topic policy

The read-only topic policy Interceptor allows you to define some topics to be Read-only. This means that any mutating requests are denied. For example, produce requests are blocked, as are any requests that alter or delete topics.

The full list of Kafka API requests that this Interceptor blocks for the specified topics is:

  • ProduceRequest
  • DeleteTopicsRequest
  • AlterConfigsRequest
  • AlterPartitionReassignmentsRequest
  • AlterPartitionRequest
  • CreatePartitionsRequest
  • IncrementalAlterConfigsRequest
  • DeleteRecordsRequest
  • ElectLeadersRequest
  • AlterReplicaLogDirsRequest

Sending a request to a read-only topic

If an attempt is made to send a request to a read-only topic, the following error will be returned, such as:

org.apache.kafka.common.errors.TopicAuthorizationException: 
Not authorized to access topics: [topic name]

Configuration

KeyTypeDefaultDescription
topicString.*Topics that match this regex will have the Interceptor applied.
actionActionBLOCKAction to take if the value is outside the specified range.

Action

  • BLOCK - when fail, save in audit and return error.
  • INFO - execute API with wrong value, save in audit.

Example

{
"name": "myReadOnlyTopicPolicyPlugin",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.ReadOnlyTopicPolicyPlugin",
"priority": 100,
"config": {
"topic": "client_topic_.*",
"action": "BLOCK"
}
}

Topic required schema ID policy

Ensuring that all records sent through your Kafka system have a schema associated with them ensures data in a known format for your Kafka consumers. Records with missing schemas can cause application outages, as consumers may be unable to process the unexpected record format.

The topic required schema ID policy Interceptor ensures that all records produced to Kafka have a schema set. Learn about schema registry and schema-id.

Sending an invalid record

Topic required schema id policy Interceptor will return the following errors when an invalid record is sent:

KeyDescription
schemaIdRequired: trueWhen sending a record without schemaId: Request parameters do not satisfy the configured policy. SchemaId is required.
schemaIdRequired: falseWhen sending a record with schemaId: Request parameters do not satisfy the configured policy. SchemaId is not allowed.

Configuration

KeyTypeDefaultDescription
topicString.*Topics that match this regex will have the Interceptor applied
schemaIdRequiredBooleanRecords must/must not have schemaId
actionActionBLOCKAction to take if the value is outside the specified range

Action

  • BLOCK - when fail, save in audit and return error.
  • INFO - execute API with wrong value, save in audit.

Example

{
"name": "myTopicRequiredSchemaIdPolicyInterceptor",
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.TopicRequiredSchemaIdPolicyPlugin",
"priority": 100,
"config": {
"topic": "topic_1.*",
"schemaIdRequired": true,
"action": "BLOCK"
}
}