I want to store the message received from the pubsub topic via pull subscription in firestore database in firebase.
so for that I have published the messages from gcp console and deployed the nodejs application for pull subscription on cloud run. Now I want to store that messages in firestore ensuring that no duplicate messages are stored.
I’m stuck in Node js code for storing the message in firebase and also I have already created new database in firestored names logged-user so I need to use that instead of default.
Cloud you please Provide me any steps and code for the same?
To reliably store messages received from a Google Cloud Pub/Sub topic into Firestore using a Node.js application on Cloud Run, follow these steps:
1. Prerequisites
Firestore Database
Ensure you have a Firestore database named logged-user created in your Firebase project.
Cloud Run Service
Your Node.js application should be deployed on Cloud Run.
Service Account
-
Create a Service Account:
-
Assign roles:
roles/pubsub.subscriberandroles/datastore.user. -
Download the JSON key file for this service account.
-
2. Initialize Firebase Admin SDK
Initialize the Firebase Admin SDK in your Node.js application to connect to Firestore using the downloaded service account key.
// index.js
const admin = require('firebase-admin');
const { PubSub } = require('@google-cloud/pubsub');
// Initialize Firebase Admin SDK
admin.initializeApp({
credential: admin.credential.cert('/path/to/your/service-account-key.json'), // Replace with your actual path
});
const db = admin.firestore();
const pubsub = new PubSub();
3. Pull Messages and Store in Firestore
Implement the function to pull messages from the Pub/Sub subscription and store them in Firestore. Ensure no duplicate messages are stored by using the message ID as the Firestore document ID.
// index.js (continued)
const subscriptionName = 'your-subscription-name'; // Replace with your actual subscription name
const collectionRef = db.collection('logged-user');
async function listenForMessages() {
const subscription = pubsub.subscription(subscriptionName);
subscription.on('message', async (message) => {
try {
// Optional: Validate message content type
// if (message.attributes && message.attributes['Content-Type'] !== 'application/json') {
// throw new Error('Invalid message content type');
// }
const data = JSON.parse(Buffer.from(message.data, 'utf-8').toString());
const docId = message.id; // Use message ID to ensure uniqueness
await collectionRef.doc(docId).set(data, { merge: true }); // Store or merge data
message.ack();
console.log(`Message ${message.id} processed and stored.`);
} catch (error) {
console.error('Error processing message:', error);
message.nack(); // Retry the message in case of a transient error
}
});
subscription.on('error', (error) => {
console.error('Error listening for messages:', error);
});
}
listenForMessages();
4. Deploy to Cloud Run
Ensure that your Node.js application is correctly set up for Cloud Run deployment and that it uses the service account key for authentication.
- Add
GOOGLE_APPLICATION_CREDENTIALSEnvironment Variable: Ensure your Cloud Run service has the environment variableGOOGLE_APPLICATION_CREDENTIALSset to the path of your service account key file.
Additional Considerations
Error Handling
- Implement Robust Error Handling: Use retry mechanisms with exponential backoff to handle transient errors. Log errors for monitoring and debugging.
Data Validation
- Validate Data: Add data validation to ensure the message data conforms to your expected schema before storing it in Firestore.
Alternative Deduplication
- Track Processed Messages: For high-volume scenarios, consider using a separate Firestore collection to track processed message IDs for faster deduplication.
Scaling
- Monitor and Scale: Monitor your Cloud Run service performance and configure auto-scaling to handle increased message load efficiently.
Security
- Manage Keys and Permissions: Follow best practices for managing service account keys and securing access to cloud resources.
Here’s the complete example for your Node.js application:
// index.js
const admin = require('firebase-admin');
const { PubSub } = require('@google-cloud/pubsub');
// Initialize Firebase Admin SDK
admin.initializeApp({
credential: admin.credential.cert('/path/to/your/service-account-key.json'), // Replace with your actual path
});
const db = admin.firestore();
const pubsub = new PubSub();
const subscriptionName = 'your-subscription-name'; // Replace with your actual subscription name
const collectionRef = db.collection('logged-user');
async function listenForMessages() {
const subscription = pubsub.subscription(subscriptionName);
subscription.on('message', async (message) => {
try {
// Optional: Validate message content type
// if (message.attributes && message.attributes['Content-Type'] !== 'application/json') {
// throw new Error('Invalid message content type');
// }
const data = JSON.parse(Buffer.from(message.data, 'utf-8').toString());
const docId = message.id; // Use message ID to ensure uniqueness
await collectionRef.doc(docId).set(data, { merge: true }); // Store or merge data
message.ack();
console.log(`Message ${message.id} processed and stored.`);
} catch (error) {
console.error('Error processing message:', error);
message.nack(); // Retry the message in case of a transient error
}
});
subscription.on('error', (error) => {
console.error('Error listening for messages:', error);
});
}
listenForMessages();