@@ -70,6 +70,8 @@ public void doTest() throws Exception {
7070
7171 testAllConfigOptions ();
7272
73+ testSaslExtensions ();
74+
7375 testAccessTokenLocation ();
7476
7577 testRefreshTokenLocation ();
@@ -93,7 +95,7 @@ private void testAllConfigOptions() throws IOException {
9395 attrs .put (ClientConfig .OAUTH_PASSWORD_GRANT_PASSWORD , "password" );
9496 attrs .put (ClientConfig .OAUTH_USERNAME_CLAIM , "username-claim" );
9597 attrs .put (ClientConfig .OAUTH_FALLBACK_USERNAME_CLAIM , "fallback-username-claim" );
96- attrs .put (ClientConfig .OAUTH_FALLBACK_USERNAME_PREFIX , "username-prefix" );
98+ attrs .put (ClientConfig .OAUTH_FALLBACK_USERNAME_PREFIX , "fallback- username-prefix" );
9799 attrs .put (ClientConfig .OAUTH_SCOPE , "scope" );
98100 attrs .put (ClientConfig .OAUTH_AUDIENCE , "audience" );
99101 attrs .put (ClientConfig .OAUTH_ACCESS_TOKEN_IS_JWT , "false" );
@@ -104,6 +106,8 @@ private void testAllConfigOptions() throws IOException {
104106 attrs .put (ClientConfig .OAUTH_HTTP_RETRY_PAUSE_MILLIS , "500" );
105107 attrs .put (ClientConfig .OAUTH_ENABLE_METRICS , "true" );
106108 attrs .put (ClientConfig .OAUTH_INCLUDE_ACCEPT_HEADER , "false" );
109+ attrs .put (ClientConfig .OAUTH_SASL_EXTENSION_PREFIX + "poolid" , "poolid-value" );
110+ attrs .put (ClientConfig .OAUTH_SASL_EXTENSION_PREFIX + "group.ref" , "group-ref-value" );
107111
108112
109113 AppConfigurationEntry jaasConfig = new AppConfigurationEntry ("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule" , AppConfigurationEntry .LoginModuleControlFlag .REQUIRED , attrs );
@@ -116,6 +120,18 @@ private void testAllConfigOptions() throws IOException {
116120 LogLineReader logReader = new LogLineReader (Common .LOG_PATH );
117121 logReader .readNext ();
118122
123+ try {
124+ loginHandler .configure (clientProps , "OAUTHBEARER" , Collections .singletonList (jaasConfig ));
125+ } catch (Exception e ) {
126+ Assert .assertTrue ("Is a ConfigException" , e instanceof ConfigException );
127+ Assert .assertTrue ("Invalid sasl extension key: " + e .getMessage (), e .getMessage ().contains ("Invalid sasl extension key: 'group.ref'" ));
128+ }
129+
130+ logReader .readNext ();
131+
132+ attrs .remove (ClientConfig .OAUTH_SASL_EXTENSION_PREFIX + "group.ref" );
133+ attrs .put (ClientConfig .OAUTH_SASL_EXTENSION_PREFIX + "group" , "group-ref-value" );
134+
119135 loginHandler .configure (clientProps , "OAUTHBEARER" , Collections .singletonList (jaasConfig ));
120136
121137 Common .checkLog (logReader , "configId" , "config-id" ,
@@ -139,7 +155,8 @@ private void testAllConfigOptions() throws IOException {
139155 "retries" , "3" ,
140156 "retryPauseMillis" , "500" ,
141157 "enableMetrics" , "true" ,
142- "includeAcceptHeader" , "false" );
158+ "includeAcceptHeader" , "false" ,
159+ "saslExtensions" , "\\ {poolid=poolid-value, group=group-ref-value\\ }" );
143160
144161
145162 // we could not check tokenEndpointUri and token in the same run
@@ -357,6 +374,34 @@ private void testValidConfigurations() {
357374 }
358375 }
359376
377+ private void testSaslExtensions () throws Exception {
378+ String testClient = "testclient" ;
379+ String testSecret = "testsecret" ;
380+
381+ changeAuthServerMode ("jwks" , "mode_200" );
382+ changeAuthServerMode ("token" , "mode_200" );
383+ createOAuthClient (testClient , testSecret );
384+
385+ Map <String , String > oauthConfig = new HashMap <>();
386+ oauthConfig .put (ClientConfig .OAUTH_TOKEN_ENDPOINT_URI , TOKEN_ENDPOINT_URI );
387+ oauthConfig .put (ClientConfig .OAUTH_CLIENT_ID , testClient );
388+ oauthConfig .put (ClientConfig .OAUTH_CLIENT_SECRET , testSecret );
389+ oauthConfig .put (ClientConfig .OAUTH_SSL_TRUSTSTORE_LOCATION , "../docker/target/kafka/certs/ca-truststore.p12" );
390+ oauthConfig .put (ClientConfig .OAUTH_SSL_TRUSTSTORE_PASSWORD , "changeit" );
391+ oauthConfig .put (ClientConfig .OAUTH_SASL_EXTENSION_PREFIX + "extoption" , "optionvalue" );
392+
393+ LogLineReader logReader = new LogLineReader (Common .LOG_PATH );
394+ logReader .readNext ();
395+
396+ // If it fails with 'Unknown signing key' it means that Kafka has not managed to load JWKS keys yet
397+ // due to jwks endpoint returning status 404 initially
398+ initJaasWithRetry (oauthConfig );
399+
400+ List <String > lines = logReader .readNext ();
401+ // Check in the log that SASL extensions have been properly set
402+ checkLogForRegex (lines , ".*LoginManager.*extensionsMap=\\ {extoption=optionvalue\\ }.*" );
403+ }
404+
360405 private void testAccessTokenLocation () throws Exception {
361406
362407 String testClient = "testclient" ;
@@ -369,7 +414,7 @@ private void testAccessTokenLocation() throws Exception {
369414 String accessToken = loginWithClientSecret (TOKEN_ENDPOINT_URI , testClient , testSecret , "../docker/target/kafka/certs/ca-truststore.p12" , "changeit" );
370415
371416 Path accessTokenFilePath = Paths .get ("target/access_token_file" );
372- Files .write (accessTokenFilePath , accessToken .getBytes (StandardCharsets .UTF_8 ), StandardOpenOption .CREATE_NEW );
417+ Files .write (accessTokenFilePath , accessToken .getBytes (StandardCharsets .UTF_8 ), StandardOpenOption .CREATE , StandardOpenOption . TRUNCATE_EXISTING );
373418 try {
374419 LogLineReader logReader = new LogLineReader (Common .LOG_PATH );
375420 logReader .readNext ();
0 commit comments