Interceptors
Conduktor Gateway offers a number of powerful Interceptors that enhance your Kafka usage. For example, you can use them to:
- perform full-body or field-level encryption and decryption
- reject (during produce) or skip (during consume) records that don't match specified data quality rules
- enforce producer configurations such as acks or compression
- override or enforce configurations during a CreateTopic request, such as a replication factor or naming convention
Configure and use
To deploy an Interceptor, you need to prepare its configuration. Here's an example for an interceptor that will block the creation of topics with more than six partitions:
- CLI
- API
curl \
--request PUT \
--url 'http://localhost:8888/gateway/v2/interceptor' \
--header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
--header 'Content-Type: application/json' \
--data-raw '{
"kind" : "Interceptor",
"apiVersion" : "gateway/v2",
"metadata" : {
"name" : "less-than-6-partitions"
},
"spec" : {
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.CreateTopicPolicyPlugin",
"priority": 100,
"config": {
"topic": ".*",
"numPartition": {
"min": 1,
"max": 6,
"action": "BLOCK"
}
}
}
}'
POST /admin/interceptors/v1/interceptor/enforce-partition-limit
{
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.CreateTopicPolicyPlugin",
"priority": 100,
"config": {
"topic": ".*",
"numPartition": {
"min": 1,
"max": 6,
"action": "BLOCK"
}
}
}
Interceptors also combine with each other to create very powerful interactions and solve many interesting use-cases in different ways.
One option is to chain them together, so that each Interceptor performs its action sequentially and independently, then passes the result to the next Interceptor to action.
The order of execution is determined by the priority of each Interceptor. Lower numbers gets executed first.
More advanced behaviors can also be configured such as Scoping and Overriding. They are presented in the detailed Interceptor Concepts page.
Chain Interceptors
Interceptors can be chained, allowing you to create powerful interactions for various scenarios.
Each Interceptor can have a distinct purpose that's unrelated to other Interceptors in the chain. Interceptors are executed in order of priority, starting with the lowest number. Interceptor actions are performed sequentially and independently, passing the results from one to the next one in the chain.
The order of execution is calculated after scoping and overriding. For example, an overridden Interceptor can have a different priority from its parent.
Interceptor scope
Interceptor scoping lets you define affected Kafka clients (ultimately resolved as service accounts).
There are four targeting scopes:
- Global
- VirtualCluster
- Group
- ServiceAccount
See resource reference details.
Example:
// This interceptor only applies to service account 'sa-clickstream'
curl \
--request PUT \
--url 'http://localhost:8888/gateway/v2/interceptor' \
--header 'Authorization: Basic YWRtaW46Y29uZHVrdG9y' \
--header 'Content-Type: application/json' \
--data-raw '{
"kind" : "Interceptor",
"apiVersion" : "gateway/v2",
"metadata" : {
"name" : "less-than-6-partitions",
"scope": {
"username": "sa-clickstream"
}
},
"spec" : {
"pluginClass": "io.conduktor.gateway.interceptor.safeguard.CreateTopicPolicyPlugin",
"priority": 100,
"config": {
"topic": ".*",
"numPartition": {
"min": 1,
"max": 6,
"action": "BLOCK"
}
}
}
}'
Overriding
Interceptor overriding lets you change the behavior of an Interceptor by redeploying it with the same name but under a different scope. This will effectively override the lower precedence Interceptors.
The order of precedence from highest (overrides all others) to lowest (easiest to override) is:
- ServiceAccount
- Group
- VirtualCluster
- Global
In the two JSON examples above, both Interceptors have the same name (enforce-partition-limit
) but two different scopes: the first one is global, the second one is targeting user sa-clickstream
. These Interceptors aren't chained but the second one is overriding the first one. The sa-clickstream
service account will be allowed to create topics with 1 to 20 partitions, while other service accounts will be limited to six. If these Interceptors had different names, they would be chained, so the first one would enforce the restriction to 6 partitions.
Interceptor interaction example
Here's an example combining Interceptors chaining, scoping and overriding:
interceptor-C
is deployed only for Alice (scoping)interceptor-D
is deployed globally (scoping) but also deployed specifically for Bob (overriding)interceptor-A
andinterceptor-B
are deployed globally (scoping)- the priorities (
01
,40
,45
and50
) are then considered for the final execution order (chaining) (img/interceptor-example.png)
When you need Interceptors to apply conditionally, targeting by Service Account is the most straightforward way.
Interceptor
API key(s): Admin API key Managed with: API CLI Console UI
Deploys an Interceptor on the Gateway
---
apiVersion: gateway/v2
kind: Interceptor
metadata:
name: enforce-partition-limit
# scope:
# vCluster: aaa
# group: bbb
# username: ccc
spec:
pluginClass: "io.conduktor.gateway.interceptor.safeguard.CreateTopicPolicyPlugin"
priority: 100
config:
topic: "myprefix-.*"
numPartition:
min: 5
max: 5
action: "INFO"
Interceptor checks:
metadata.scope
is optional (default empty).metadata.scope.[vCluster | group | username]
combine with each other to define the targeting- Check the dedicated Interceptor Targeting section
spec.pluginClass
is mandatory. Must be a valid Interceptor class name from our available Interceptorsspec.priority
is mandatoryspec.config
is a valid config for thepluginClass
Interceptor targeting
You can activate your Interceptor only in specific scenarios. Use the table below to configure Targeting settings.
Use case | metadata.scope.vcluster | metadata.scope.group | metadata.scope.username |
---|---|---|---|
Global Interceptor (Including Virtual Clusters) | Set to null | Set to null | Set to null |
Global Interceptor (Excluding Virtual Clusters) | Empty | Empty | Empty |
Username Targeting | Empty | Empty | Set |
Group Targeting | Empty | Set | Empty |
Virtual Cluster Targeting | Set | Empty | Empty |
Virtual Cluster + Username Targeting | Set | Empty | Set |
Virtual Cluster + Group Targeting | Set | Set | Empty |
You can deploy multiple interceptors with the same name using a different targeting scope. This will effectively override the configuration for the scope.
The order of precedence from highest (overrides all others) to lowest (most easily overridden) is:
- ServiceAccount
- Group
- VirtualCluster
- Global
Examples
---
# This interceptor targets everyone (Including Virtual Clusters)
apiVersion: gateway/v2
kind: Interceptor
metadata:
name: enforce-partition-limit
scope:
vCluster: null
group: null
username: null
spec:
---
# This interceptor targets everyone (Excluding Virtual Clusters)
apiVersion: gateway/v2
kind: Interceptor
metadata:
name: enforce-partition-limit
spec:
---
# This interceptor targets only `admin` service account
apiVersion: gateway/v2
kind: Interceptor
metadata:
name: enforce-partition-limit
scope:
username: admin
spec:
---
# This interceptor targets only `read-only` virtual cluster
apiVersion: gateway/v2
kind: Interceptor
metadata:
name: enforce-partition-limit
scope:
vCluster: read-only
spec:
GatewayServiceAccount
GatewayServiceAccount is generally optional when using Oauth, mTLS or Delegated Backing Kafka authentication.
GatewayServiceAccount resource is enabled or not depending on your Gateway configuration. This is to prevent you to declare a resource that is incompatible with your current configuration:
GATEWAY_SECURITY | LOCAL GatewayServiceAccount | EXTERNAL GatewayServiceAccount |
---|---|---|
PLAINTEXT | 🚫 | 🚫 |
SSL | 🚫 | only if mTls |
SASL_PLAINTEXT | ✅ | only if OAuth configured |
SASL_SSL | ✅ | only if OAuth configured |
DELEGATED_SASL_PLAINTEXT | 🚫 | ✅ |
DELEGATED_SASL_SSL | 🚫 | ✅ |
There are a few cases where you must declare GatewayServiceAccount objects:
- Creating Local Service Accounts
- Renaming Service Accounts for easier clarity when using Interceptors
- Attaching Service Accounts to Virtual Clusters
---
# External User renamed
apiVersion: gateway/v2
kind: GatewayServiceAccount
metadata:
name: application1
spec:
type: EXTERNAL
externalNames:
- 00u9vme99nxudvxZA0h7
---
# Local User on Virtual Cluster vc-B
apiVersion: gateway/v2
kind: GatewayServiceAccount
metadata:
vCluster: vc-B
name: admin
spec:
type: LOCAL
GatewayServiceAccount checks:
- When
spec.type
isEXTERNAL
:spec.externalNames
must be a non-empty list of external names. Each name must be unique across all declared GatewayServiceAccount.- At the moment we only support a list of one element. Support for multiple externalNames will be added in the future.
GatewayServiceAccount side effects:
- When
spec.type
isEXTERNAL
:- During Client connection, the authenticated user will be checked against the list of
externalNames
to decide which GatewayServiceAccount it is.
- During Client connection, the authenticated user will be checked against the list of
- When
spec.type
isLOCAL
:- Access to
/gateway/v2/tokens
endpoint to generate a password for this Service Account - Switching a GatewayServiceAccount
spec.type
fromLOCAL
toEXTERNAL
does not invalidate previously emitted tokens. They will keep on working for their TTL.
- Access to
GatewayGroup
Gateway Group lets you add multiple users in the same GatewayGroup for easier Interceptor targeting capabilities.
---
# Users added to the group manually
apiVersion: gateway/v2
kind: GatewayGroup
metadata:
name: group-a
spec:
members:
- name: admin
- vCluster: vc-B
name: "0000-AAAA-BBBB-CCCC"
GatewayGroup checks:
spec.members[].name
is mandatory.- Currently, the username needs to refer to an existing GatewayServiceAccount otherwise it will fail. This is a known issue that we'll address in a further release.
spec.members[].vCluster
is optional. Must refer to an existing Virtual Cluster. When not using Virtual Clusters, don't set this attribute.
GatewayGroup side effects:
- All members of the group will be affected by Interceptors deployed with this group's scope.
ConcentrationRule
Concentration Rules allow you to define patterns where topic creation won't generate a physical topic, but will instead use our Topic Concentration feature.
---
apiVersion: gateway/v2
kind: ConcentrationRule
metadata:
# vCluster: vc-B
name: toutdanstiti
spec:
pattern: titi-.*
physicalTopics:
delete: titi-delete
compact: titi-compact
deleteCompact: titi-cd
autoManaged: false
offsetCorrectness: false
ConcentrationRule checks:
metadata.vCluster
is optional. Must refer to an existing Virtual Cluster. When not using Virtual Clusters, don't set this attribute.spec.physicalTopics.delete
is mandatory. Must be a valid topic name with acleanup.policy
set todelete
spec.physicalTopics.compact
is optional. Must be a valid topic name with acleanup.policy
set tocompact
spec.physicalTopics.deleteCompact
is optional. Must be a valid topic name with acleanup.policy
set todelete,compact
spec.autoManaged
is optional, defaultfalse
spec.offsetCorrectness
is optional, defaultfalse
ConcentrationRule side effects:
- Once the Concentration Rule is deployed, topics created with a name matching the
spec.pattern
will not be created as real Kafka topics but as Concentrated Topics instead. - Depending on the topic's
cleanup.policy
, the topic's data will be stored in one of the configured physical topics. - If a topic creation request is made with a
cleanup.policy
that isn't configured in the ConcentrationRule, topic creation will fail. - It is not possible to update
cleanup.policy
of a concentrated topic. - If
spec.autoManaged
is set totrue
, the underlying physical topics and configurations will be automatically created and/or extended to honour the topics configurations. - If
spec.offsetCorrectness
is set totrue
, Gateway will maintain a list of offsets for each of the Concentrated Topic records.- This allows for a proper calculation of Message Count and Consumer Group Lag.
- There are some limitation. Read more about Offset Correctness here
- If
spec.offsetCorrectness
is set tofalse
, Gateway will report the offsets of the backing topic records.
If a ConcentrationRule spec changes, it will not affect previously created concentrated topics, it will only affect the topics created after the change.