Skip to content

Commit

Permalink
feat: add support for prefixed resources for Group principals (#73)
Browse files Browse the repository at this point in the history
* feat: add support for prefixed resources for Group principals

* chore: remove unused consumer
  • Loading branch information
sverrehu committed Sep 24, 2024
1 parent 742e34c commit 689916c
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
Expand Down Expand Up @@ -66,7 +68,8 @@ private static void overrideResultsByGroup(final Authorizer authorizer, final Au
}

private static AuthorizationResult authorize(final Authorizer authorizer, final Set<String> groups, final Action action) {
final ResourcePatternFilter resourcePatternFilter = action.resourcePattern().toFilter();
final ResourcePattern resourcePattern = action.resourcePattern();
final ResourcePatternFilter resourcePatternFilter = new ResourcePatternFilter(resourcePattern.resourceType(), resourcePattern.name(), PatternType.MATCH);
final AccessControlEntryFilter accessControlEntryFilter = new AccessControlEntryFilter(null, null, action.operation(), AclPermissionType.ANY);
final AclBindingFilter aclBindingFilter = new AclBindingFilter(resourcePatternFilter, accessControlEntryFilter);
final Iterable<AclBinding> acls = authorizer.acls(aclBindingFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public abstract class AbstractLdapAuthenticateCallbackHandlerIntegrationIT {

private static final String TOPIC_WITH_USER_ALLOW = "topic_with_user_principal";
private static final String TOPIC_WITH_GROUP_ALLOW = "topic_with_group_principal";
private static final String TOPIC_PREFIX_WITH_USER_ALLOW = "topic_with_user_principal";
private static final String TOPIC_PREFIX_WITH_GROUP_ALLOW = "topic_with_group_principal";
private static final String PREFIXED_TOPIC_WITH_USER_ALLOW = TOPIC_PREFIX_WITH_USER_ALLOW + "-topic";
private static final String PREFIXED_TOPIC_WITH_GROUP_ALLOW = TOPIC_PREFIX_WITH_GROUP_ALLOW + "-topic";
private static final String JAAS_ADMIN_USER_LINE = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"kafka\" user_kafka=\"kafka\";";
private static final String ANY_HOST = "*";
private static LdapServer ldapServer;
Expand Down Expand Up @@ -100,6 +104,11 @@ private void setupTestTopicsAndAcls() {
addTopic(TOPIC_WITH_GROUP_ALLOW);
addProducer(TOPIC_WITH_GROUP_ALLOW, "Group:" + LdapServer.PRODUCERS_GROUP);
addDeniedProducer(TOPIC_WITH_GROUP_ALLOW, "Group:" + LdapServer.DENIED_PRODUCERS_GROUP);
addTopic(PREFIXED_TOPIC_WITH_USER_ALLOW);
addProducerForPrefixedTopic(TOPIC_PREFIX_WITH_USER_ALLOW, "User:" + LdapServer.PRODUCER_WITH_USER_ALLOW_USER_PASS);
addTopic(PREFIXED_TOPIC_WITH_GROUP_ALLOW);
addProducerForPrefixedTopic(TOPIC_PREFIX_WITH_GROUP_ALLOW, "Group:" + LdapServer.PRODUCERS_GROUP);
addDeniedProducer(PREFIXED_TOPIC_WITH_GROUP_ALLOW, "Group:" + LdapServer.DENIED_PRODUCERS_GROUP);
}

private void assertLdapAuthenticationWorks() {
Expand All @@ -126,6 +135,13 @@ public final void shouldProduceWhenProducerByUser() {
}
}

@Test
public final void shouldProduceOnPrefixedTopicWhenProducerByUser() {
try (final Producer<Integer, String> producer = getProducer(LdapServer.PRODUCER_WITH_USER_ALLOW_USER_PASS)) {
produce(producer, PREFIXED_TOPIC_WITH_USER_ALLOW, "foo");
}
}

@Test
public final void shouldNotProduceWhenNotProducerByGroup() {
Assertions.assertThrows(TopicAuthorizationException.class, () -> {
Expand All @@ -151,6 +167,31 @@ public final void shouldProduceWhenProducerByGroup() {
}
}

@Test
public final void shouldNotProduceOnPrefixedTopicWhenNotProducerByGroup() {
Assertions.assertThrows(TopicAuthorizationException.class, () -> {
try (final Producer<Integer, String> producer = getProducer(LdapServer.NON_PRODUCER_USER_PASS)) {
produce(producer, PREFIXED_TOPIC_WITH_GROUP_ALLOW, "foo");
}
});
}

@Test
public final void shouldNotProduceOnPrefixedTopicWhenInADeniedGroupEvenIfInAllowedGroup() {
Assertions.assertThrows(TopicAuthorizationException.class, () -> {
try (final Producer<Integer, String> producer = getProducer(LdapServer.PRODUCER_WITH_GROUP_DENY_USER_PASS)) {
produce(producer, PREFIXED_TOPIC_WITH_GROUP_ALLOW, "foo");
}
});
}

@Test
public final void shouldProduceOnPrefixedTopicWhenProducerByGroup() {
try (final Producer<Integer, String> producer = getProducer(LdapServer.PRODUCER_WITH_GROUP_ALLOW_USER_PASS)) {
produce(producer, PREFIXED_TOPIC_WITH_GROUP_ALLOW, "foo");
}
}

private Producer<Integer, String> getProducer(final String userPass) {
return getProducer(userPass, userPass);
}
Expand Down Expand Up @@ -194,10 +235,23 @@ private void addProducer(final String topicName, final String principal, final A
}
}

private void addProducerForPrefixedTopic(final String topicPrefix, final String principal) {
final AclBinding describeAclBinding = createPrefixedBinding(topicPrefix, principal, AclOperation.DESCRIBE, AclPermissionType.ALLOW);
final AclBinding writeAclBinding = createPrefixedBinding(topicPrefix, principal, AclOperation.WRITE, AclPermissionType.ALLOW);
final Collection<AclBinding> aclBindings = Arrays.asList(describeAclBinding, writeAclBinding);
try (final Admin admin = getSuperAdmin()) {
admin.createAcls(aclBindings);
}
}

private AclBinding createLiteralBinding(final String topicName, final String principal, final AclOperation operation, final AclPermissionType permissionType) {
return createBinding(topicName, PatternType.LITERAL, principal, operation, permissionType);
}

private AclBinding createPrefixedBinding(final String topicPrefix, final String principal, final AclOperation operation, final AclPermissionType permissionType) {
return createBinding(topicPrefix, PatternType.PREFIXED, principal, operation, permissionType);
}

private AclBinding createBinding(final String topicName, final PatternType patternType, final String principal, final AclOperation operation, final AclPermissionType permissionType) {
final ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC, topicName, patternType);
final AccessControlEntry accessControlEntry = new AccessControlEntry(principal, ANY_HOST, operation, permissionType);
Expand Down

0 comments on commit 689916c

Please sign in to comment.