Is it possible connect to GCP Managed Service for Apache Kafka (MSK) cluster with SASL/OAUTHBEARER?

I’m trying to set up grepplabs/kafka-proxy on GKE with Workload Identity to connect to GCP Managed Service for Apache Kafka (MSK) via SASL/OAUTHBEARER. I got SASL/PLAIN authentication to work with Access Token, but we want to use OAUTHBEARER to avoid static keys. Been trying all kinds of token but I keep getting “Access token is not a Google OAuth token” error.

These are the tokens that I tried passin onto this function:

func getJWT(creds *google.Credentials) (string, error) {
	// Parse token expiry and email from the credentials
	token, _ := creds.TokenSource.Token()

	email := os.Getenv("GCP_SA_NAME")

	payload := map[string]interface{}{
		"exp":   time.Now().Add(time.Until(token.Expiry)).Unix(),
		"iat":   time.Now().UTC().Unix(),
		"iss":   "Google",
		"sub":   email,
		"scope": "kafka",
	}

	payloadJSON, err := json.Marshal(payload)
	if err != nil {
		return "", err
	}

	headerJSON, _ := json.Marshal(headerPayload)

	return strings.Join([]string{
		b64Encode(string(headerJSON)),
		b64Encode(string(payloadJSON)),
		b64Encode(token.AccessToken),
	}, "."), nil
}

Would greatly appreciate if someone can point me in the right direction. Thank you!

1 Like

Hi @mhan38 ,

Welcome to Google Cloud Community!

You can connect to Google Cloud Managed Service for Apache Kafka using the standard open-source Apache Kafka API. GCP’s managed Kafka service supports two authentication methods: SASL/OAUTHBEARER and SASL/PLAIN.

If you’re setting things up, check out the official configuration guide for step-by-step instructions on how to get connected.

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.

I couldn’t make OAUTHBEARER from node.js either. Actual working code example in any language would be really helpful.

Subject: SASL/OAUTHBEARER Fails for GKE Workload Identity to Managed Kafka

ERROR: The connection consistently fails during the SASL handshake, resulting in either a client-side `KafkaError{code=_MSG_TIMED_OUT}` or a broker-side `SaslAuthenticationException: Authentication failed due to invalid credentials`.

WIF is enabled on the cluster (`workload_pool` is set) and the node pool (`mode: GKE_METADATA`). A `ServiceAccount` is annotated with `iam.gke.io/return-principal-id-as-email: “true”` and is used by the application pods. A Google Service Account (GSA) has the `roles/managedkafka.client` and `roles/iam.serviceAccountTokenCreator` roles. The Kubernetes Service Account is bound to this GSA with the `roles/iam.workloadIdentityUser` role. The Python client is configured for `SASL_SSL` with `sasl.mechanism: OAUTHBEARER` and uses `google.auth.default()` within an `oauth_cb` callback to provide the authentication token.

I have validated network connectivity (via socket tests), all IAM/GKE/K8s policies and annotations, and Kafka ACLs (including `User:*`)—every component is correctly configured as per the docs.

the authentication fails with both the direct federation (`return-principal-id-as-email` annotation) and the KSA-to-GSA impersonation (`gcp-service-account` annotation) methods of Workload Identity.

Has anyone successfully implemented this specific authentication pattern?