Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrong Request sent by Kafka to OPA #51

Open
ameyapanse opened this issue Mar 30, 2023 · 5 comments
Open

Wrong Request sent by Kafka to OPA #51

ameyapanse opened this issue Mar 30, 2023 · 5 comments

Comments

@ameyapanse
Copy link

ameyapanse commented Mar 30, 2023

I ran a local kafka 3.4.0 broker, and enabled mTLS with self signed certificates and using OPA policies as ACLs. I am using the example OPA policy posted in the project :

Policy

package kafka.authz

import future.keywords.in

default allow = false

allow {
inter_broker_communication
}

allow {
consume(input.action)
on_own_topic(input.action)
as_consumer
}

allow {
produce(input.action)
on_own_topic(input.action)
as_producer
}

allow {
create(input.action)
on_own_topic(input.action)
}

allow {
any_operation(input.action)
on_own_topic(input.action)
as_mgmt_user
}

allow {
input.action.operation == "READ"
input.action.resourcePattern.resourceType == "GROUP"
}

allow {
describe(input.action)
}

inter_broker_communication {
input.requestContext.principal.name == "ANONYMOUS"
}

inter_broker_communication {
input.requestContext.securityProtocol == "SSL"
input.requestContext.principal.principalType == "User"
username == "localhost"
}

consume(action) {
action.operation == "READ"
}

produce(action) {
action.operation == "WRITE"
}

create(action) {
action.operation == "CREATE"
}

describe(action) {
action.operation == "DESCRIBE"
}

any_operation(action) {
action.operation in ["READ", "WRITE", "CREATE", "ALTER", "DESCRIBE", "DELETE"]
}

as_consumer {
regex.match(".*-consumer", username)
}

as_producer {
regex.match(".*-producer", username)
}

as_mgmt_user {
regex.match(".*-mgmt", username)
}

on_own_topic(action) {
owner := trim(username, "-consumer")
regex.match(owner, action.resourcePattern.name)
}

on_own_topic(action) {
owner := trim(username, "-producer")
regex.match(owner, action.resourcePattern.name)
}

on_own_topic(action) {
owner := trim(username, "-mgmt")
regex.match(owner, action.resourcePattern.name)
}

username = cn_parts[0] {
name := input.requestContext.principal.name
startswith(name, "CN=")
parsed := parse_user(name)
cn_parts := split(parsed.CN, ".")
}

else = input.requestContext.principal.name {
true
}

parse_user(user) = {key: value |
parts := split(user, ",")
[key, value] := split(parts[_], "=")
}

Producer

On running a console producer to produce to the topic bob-topic , :
bin/kafka-console-producer.sh --bootstrap-server localhost:9094 --producer.config bob.producer.config --topic bob-topic

I get the :
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.

This is the request received by OPA from the Kafka Broker :

{"client_addr":"127.0.0.1:51806","level":"info","msg":"Received request.","req_body":"{"input":{"requestContext":{"clientAddress":"/127.0.0.1","clientInformation":{"softwareName":"apache-kafka-java","softwareVersion":"3.4.0"},"connectionId":"127.0.0.1:9094-127.0.0.1:51818-0","header":{"name":{"clientId":"console-producer","correlationId":2,"requestApiKey":22,"requestApiVersion":4},"headerVersion":2},"listenerName":"SSL", "principal":{"principalType":"User", "name": "CN=bob-producer"},"securityProtocol": "SSL"},
"action":{"resourcePattern":{"resourceType":"TOPIC", "name":"" ,"patternType": "PREFIXED" ,"unknown":false}, "operation": "WRITE", "resourceReferenceCount":0, "logIfAllowed":true, "logIfDenied":true}}}" , "req_id":8,"req_method":"POST","req_params":{},"req_path":"/v1/data/kafka/authz/allow","time":"2023-03-30T11:31:25-07:00"}

Notice the
{"resourcePattern":{"resourceType":"TOPIC", "name":"" ,"patternType": "PREFIXED" ,"unknown":false}
which I believe is the cause of the error

If it was
{"resourcePattern":{"resourceType":"TOPIC", "name":"bob-topic" ,"patternType": "LITERAL" ,"unknown":false}
then OPA would have allowed the access.

Is this a bug or am I doing something wrong ?

@anderseknert
Copy link
Member

anderseknert commented Apr 3, 2023

I'm afraid I don't remember the details of the PREFIXED topic type. @scholzj might be able to answer.

https://play.openpolicyagent.org/p/Fm49HlM0e8

@scholzj
Copy link
Collaborator

scholzj commented Apr 3, 2023

I haven't seen an action in Kafka to be against prefix. If you have a topic named bob-topic, it is basically impossible to create an action against the prefix as you would need to try all the possible prefixes -> empty string, b bo, bob, bob-, bob-t etc. The clients normally always act against a clearly defined topic from my experience and the prefix concept is something that is used only internally in the ACL rules to simplify them. I also did not see this behavior in my tests.

Maybe @ameyapanse can provide the details of the broker configuration, OPA version used etc. to see if it is reproducible.

@ameyapanse
Copy link
Author

OPA is receiving the PREFIXED topic request from Kafka for some reason. My understanding was the that the request should be a LITERAL.

Broker Configs :
advertised.listeners = null
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name = org.openpolicyagent.kafka.OpaAuthorizer
auto.create.topics.enable = true
auto.include.jmx.reporter = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.heartbeat.interval.ms = 2000
broker.id = 0
broker.id.generation.enable = true
broker.rack = null
broker.session.timeout.ms = 9000
client.quota.callback.class = null
compression.type = producer
connection.failed.authentication.delay.ms = 100
connections.max.idle.ms = 600000
connections.max.reauth.ms = 0
control.plane.listener.name = null
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.listener.names = null
controller.quorum.append.linger.ms = 25
controller.quorum.election.backoff.max.ms = 1000
controller.quorum.election.timeout.ms = 1000
controller.quorum.fetch.timeout.ms = 2000
controller.quorum.request.timeout.ms = 2000
controller.quorum.retry.backoff.ms = 20
controller.quorum.voters = []
controller.quota.window.num = 11
controller.quota.window.size.seconds = 1
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 1
delegation.token.expiry.check.interval.ms = 3600000
delegation.token.expiry.time.ms = 86400000
delegation.token.master.key = null
delegation.token.max.lifetime.ms = 604800000
delegation.token.secret.key = null
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
early.start.listeners = null
fetch.max.bytes = 57671680
fetch.purgatory.purge.interval.requests = 1000
group.initial.rebalance.delay.ms = 0
group.max.session.timeout.ms = 1800000
group.max.size = 2147483647
group.min.session.timeout.ms = 6000
initial.broker.registration.timeout.ms = 60000
inter.broker.listener.name = null
inter.broker.protocol.version = 3.4-IV0
kafka.metrics.polling.interval.secs = 10
kafka.metrics.reporters = []
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners = SSL://localhost:9094
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.max.compaction.lag.ms = 9223372036854775807
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /tmp/kafka-logs
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 60000
log.flush.scheduler.interval.ms = 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms = 60000
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.downconversion.enable = true
log.message.format.version = 3.0-IV1
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300000
log.retention.hours = 168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 60000
max.connection.creation.rate = 2147483647
max.connections = 2147483647
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
max.incremental.fetch.session.cache.slots = 1000
message.max.bytes = 1048588
metadata.log.dir = null
metadata.log.max.record.bytes.between.snapshots = 20971520
metadata.log.max.snapshot.interval.ms = 3600000
metadata.log.segment.bytes = 1073741824
metadata.log.segment.min.bytes = 8388608
metadata.log.segment.ms = 604800000
metadata.max.idle.interval.ms = 500
metadata.max.retention.bytes = 104857600
metadata.max.retention.ms = 604800000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
min.insync.replicas = 1
node.id = 0
num.io.threads = 8
num.network.threads = 3
num.partitions = 1
num.recovery.threads.per.data.dir = 1
num.replica.alter.log.dirs.threads = null
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 10080
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 1
offsets.topic.segment.bytes = 104857600
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
password.encoder.iterations = 4096
password.encoder.key.length = 128
password.encoder.keyfactory.algorithm = null
password.encoder.old.secret = null
password.encoder.secret = null
principal.builder.class = class org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
process.roles = []
producer.id.expiration.check.interval.ms = 600000
producer.id.expiration.ms = 86400000
producer.purgatory.purge.interval.requests = 1000
queued.max.request.bytes = -1
queued.max.requests = 500
quota.window.num = 11
quota.window.size.seconds = 1
remote.log.index.file.cache.total.size.bytes = 1073741824
remote.log.manager.task.interval.ms = 30000
remote.log.manager.task.retry.backoff.max.ms = 30000
remote.log.manager.task.retry.backoff.ms = 500
remote.log.manager.task.retry.jitter = 0.2
remote.log.manager.thread.pool.size = 10
remote.log.metadata.manager.class.name = null
remote.log.metadata.manager.class.path = null
remote.log.metadata.manager.impl.prefix = null
remote.log.metadata.manager.listener.name = null
remote.log.reader.max.pending.tasks = 100
remote.log.reader.threads = 10
remote.log.storage.manager.class.name = null
remote.log.storage.manager.class.path = null
remote.log.storage.manager.impl.prefix = null
remote.log.storage.system.enable = false
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 1048576
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 30000
replica.selector.class = null
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 30000
reserved.broker.max.id = 1000
sasl.client.callback.handler.class = null
sasl.enabled.mechanisms = [GSSAPI]
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism.controller.protocol = GSSAPI
sasl.mechanism.inter.broker.protocol = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
sasl.server.callback.handler.class = null
sasl.server.max.receive.size = 524288
security.inter.broker.protocol = SSL
security.providers = null
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
socket.listen.backlog.size = 50
socket.receive.buffer.bytes = 102400
socket.request.max.bytes = 104857600
socket.send.buffer.bytes = 102400
ssl.cipher.suites = []
ssl.client.auth = required
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = certs/server/keystore.p12
ssl.keystore.password = [hidden]
ssl.keystore.type = PKCS12
ssl.principal.mapping.rules = DEFAULT
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = certs/server/truststore.p12
ssl.truststore.password = [hidden]
ssl.truststore.type = PKCS12
transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
transaction.max.timeout.ms = 900000
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
transaction.state.log.load.buffer.size = 5242880
transaction.state.log.min.isr = 1
transaction.state.log.num.partitions = 50
transaction.state.log.replication.factor = 1
transaction.state.log.segment.bytes = 104857600
transactional.id.expiration.ms = 604800000
unclean.leader.election.enable = false
zookeeper.clientCnxnSocket = null
zookeeper.connect = localhost:2181
zookeeper.connection.timeout.ms = 18000
zookeeper.max.in.flight.requests = 10
zookeeper.metadata.migration.enable = false
zookeeper.session.timeout.ms = 18000
zookeeper.set.acl = false
zookeeper.ssl.cipher.suites = null
zookeeper.ssl.client.enable = false
zookeeper.ssl.crl.enable = false
zookeeper.ssl.enabled.protocols = null
zookeeper.ssl.endpoint.identification.algorithm = HTTPS
zookeeper.ssl.keystore.location = null
zookeeper.ssl.keystore.password = null
zookeeper.ssl.keystore.type = null
zookeeper.ssl.ocsp.enable = false
zookeeper.ssl.protocol = TLSv1.2
zookeeper.ssl.truststore.location = null
zookeeper.ssl.truststore.password = null
zookeeper.ssl.truststore.type = null

OPA Version : 0.49.2
OPAAuthorizer Version : 1.5.1

@scholzj
Copy link
Collaborator

scholzj commented Apr 3, 2023

Ok, so you start a single broker Kafka cluster with this config, and you run this command:

bin/kafka-console-producer.sh --bootstrap-server localhost:9094 --producer.config bob.producer.config --topic bob-topic

Without any other tools or commands running in parallel?

And you get that request? What is in bob.producer.config?

@ameyapanse
Copy link
Author

cat bob.producer.config

security.protocol=SSL

ssl.keystore.location=certs/bob/producer/keystore.p12
ssl.keystore.password=NotASecret
ssl.key.password=NotASecret
ssl.truststore.location=certs/bob/producer/truststore.p12
ssl.truststore.password=NotASecret

ssl.keystore.type=PKCS12
ssl.truststore.type=PKCS12
ssl.endpoint.identification.algorithm=

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants