Publishing to Managed Kafka on Cloud VM using Kafkajs

I am trying to implement kafka producer using kafkajs node library. I’m running it on VM that has access to the cluster, which I verified using kafka tools as described in quick start, including publishing and consuming messages. However I’m unable to make it work from node app with ADC, the error is “Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER”. I also verified that the service account has necessary permissions.

Here’s the complete source:

const { Kafka } = require('kafkajs');
const { ManagedKafkaClient } = require('@google-cloud/managedkafka');

  const managedKafka = new ManagedKafkaClient();

  const oauthBearerProvider = async () => {
    try {
      const accessToken = await managedKafka.auth.getAccessToken();
      return { value: accessToken };
    } catch (error) {
      console.error('Error getting access token:', error);
      throw error;
    }
  };

  // Kafka configuration
  const kafka = new Kafka({
    clientId: 'connection-test-producer',
    brokers: ['bootstrap.<my-cluster>.us-central1.managedkafka.<my-network>.cloud.goog:9092'],
    ssl: true,
    sasl: {
      mechanism: 'OAUTHBEARER',
      oauthBearerProvider
    }
  });

  const producer = kafka.producer();

 producer.connect();
   
console.log('Connected');

You need to format the token. The managed kafka won’t accept the accessToken as is. You have to do something similar like python e.g. here Develop a Python producer  |  Google Cloud Managed Service for Apache Kafka  |  Google Cloud Documentation.

For e.g., the below code seems to work. Maybe you also have to add some token refresh logic.

import { GoogleAuth } from 'google-auth-library';
import { trimEnd } from 'lodash';

const getHeader = () =>
  JSON.stringify({ typ: 'JWT', alg: 'GOOG_OAUTH2_TOKEN' });
const getJwt = (exp: number, sub: string) => {
  return JSON.stringify({
    exp,
    iss: 'Google',
    iat: Math.floor(Date.now() / 1000),
    sub,
  });
};

const b64Encode = (value: string) =>
  trimEnd(Buffer.from(value, 'utf-8').toString('base64'), '=');

export const getToken = async () => {
  const auth = new GoogleAuth();
  const authClient = await auth.getClient();

  const { token: accessToken } = await authClient.getAccessToken();
  const { client_email } = await auth.getCredentials();
  const { expiry_date } = authClient.credentials;
  const expireTime = Math.floor(
    (expiry_date ? new Date(expiry_date).getTime() : Date.now() + 3600) / 1000,
  );

  const tokenHeader = b64Encode(getHeader());
  const tokenBody = b64Encode(getJwt(expireTime, client_email));
  const token = b64Encode(accessToken);

  const kafkaAccessToken = [tokenHeader, tokenBody, token].join('.');

  return {
    value: kafkaAccessToken,
  };
};