Yes, you should be able to use null as a default value for an Avro schema field in Google Cloud Pub/Sub. Your schema specifies that the type of the field can be either a “string” or “null”, which is a valid union type in Avro. Here’s an example of a correct Avro schema with a null default value:
In Avro, when you declare a union type, the type order matters. The first type is used as the default type if no value is provided. So in your schema, make sure “null” is listed first, before “string”.
Using schema suggested by you, able to publish null value on google pubsub but when I try to publish actual value (string type) then it causing error. Below one is stackTrace for your reference.
com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
at com.google.common.util.concurrent.Futures$4.run(Futures.java:1126)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:399)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:902)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:813)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:677)
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541)
at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:64)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.
at io.grpc.Status.asRuntimeException(Status.java:539)
From your provided stacktrace, it appears that you’re running into an INVALID_ARGUMENT error due to a schema validation failure, when trying to publish a non-null (i.e., string) value to a Pub/Sub topic.
This error typically occurs when the data being published does not match the schema defined for the topic. With the Avro schema that you’re using, both null and string values should be valid for the field1 field.
There could be an issue with how the data is being serialized before it’s being published to the topic. In Avro, if a field is declared as a union of types, the data for that field needs to be serialized as a JSON object where the key is the type and the value is the actual value. So, for a string value, it should look like this:
When using Avro with Pub/Sub in the way you want (being able to publish both { "field1":"abcd"} and { "field1":null}) is not possible without additional processing on your end.
As a workaround, you could handle the serialization and deserialization yourself in your publishing and subscribing code. When you’re publishing a message, you could check if the value of field1 is null or a string, and then serialize it in the way Avro expects:
Then, when you’re receiving a message, you could check if the value of field1 is null or a JSON object, and then deserialize it to the format you want:
if (message.field1 == null) {
value = null;
} else {
value = message.field1.string;
}
This workaround would allow you to publish and receive messages in the format you want, but it does add some complexity to your code.
Is there any official documentation for Google pubsub avro schema. Currently I am working on migration from kafka to Google pubsub, so coming across this kind of challenges. What AVRO schema google pubsub will support.
It would be very helpful if there is any documentation link available so that I can refer directly while development.
Because there is huge difference between AVRO schema supported by kafka and google pubsub.
There are some key differences between Kafka and Pub/Sub:
Schema Resolution and Evolution: Both Kafka (via Confluent’s Schema Registry) and Pub/Sub allow for schema evolution, that is, the ability to change the schema over time in a compatible way. This means you can add fields, remove fields, or change the type of fields in your Avro schema and still be able to read data written with the old schema.
Schema Enforcement: Kafka does not enforce schemas directly; instead, it relies on the Schema Registry tool provided by Confluent. The Schema Registry API maintains a versioned history of all schemas and allows for the evolution of schemas according to the configured compatibility settings. In contrast, Pub/Sub has built-in schema enforcement. When a schema is associated with a Pub/Sub topic, all messages published to that topic must conform to the schema, and messages that do not conform are not published​.
Schema Storage: In the context of Kafka and the Schema Registry, the schema is stored in a separate Kafka topic, and each schema is identified by a unique ID. This contrasts with Avro’s standard method where the schema is typically stored with the data​.
Avro Language Support: Pub/Sub uses the standard Avro JSON format for defining schemas. On the other hand, Kafka, in conjunction with the Confluent platform, can leverage Avro IDL (Interface Description Language) which provides a higher-level, more programmer-friendly language to define Avro schemas. It’s important to note that Avro IDL is not JSON, and it is more powerful and expressive than standard Avro JSON​
You can find the official documentation for Pub/Sub’s support of Avro schemas here:
If you want to make the “method” field nullable in your Avro schema, you need to use a union type that includes both “null” and your enum type. Here’s how you can modify your example to allow the “method” field to be either one of your enum values or null:
{
“name”: “method”,
“type”: [“null”, {
“type”: “enum”,
“name”: “MethodEnum”,
“symbols”: [
“GET”,
“POST”,
“PUT”,
“DELETE”
]
}]
}
In this example, the “method” field can be either “GET”, “POST”, “PUT”, “DELETE”, or null. When you’re creating your Avro data, if you want to set the “method” field to null, you can simply assign it the value null:
{
“method”: null
}
And if you want to set it to one of your enum values, you can assign it that value:
{
“method”: “GET”
}
Please Note: you’ll need to serialize this data into a string before you publish it to Pub/Sub, and then deserialize it when you receive the message. The application that’s consuming the messages also needs to be aware of this schema in order to properly interpret the data.