Skip to content

Commit

Permalink
Add support for autoregister schemas in the json serde (Apicurio#4127)
Browse files Browse the repository at this point in the history
  • Loading branch information
carlesarnal committed Dec 20, 2023
1 parent 78ab748 commit 627a9ee
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,54 @@ public void testJsonSchemaSerde() throws Exception {
}
}

@Test
public void testJsonSchemaSerdeAutoRegister() throws Exception {
String groupId = TestUtils.generateGroupId();
String artifactId = generateArtifactId();

Person person = new Person("Carles", "Arnal", 30);

try (JsonSchemaKafkaSerializer<Person> serializer = new JsonSchemaKafkaSerializer<>(restClient, true);
Deserializer<Person> deserializer = new JsonSchemaKafkaDeserializer<>(restClient, true)) {

Map<String, Object> config = new HashMap<>();
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class.getName());
config.put(SerdeConfig.SCHEMA_LOCATION, "/io/apicurio/registry/util/json-schema.json");
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true);
serializer.configure(config, false);

deserializer.configure(Collections.emptyMap(), false);

Headers headers = new RecordHeaders();
byte[] bytes = serializer.serialize(artifactId, headers, person);

person = deserializer.deserialize(artifactId, headers, bytes);

Assertions.assertEquals("Carles", person.getFirstName());
Assertions.assertEquals("Arnal", person.getLastName());
Assertions.assertEquals(30, person.getAge());

person.setAge(-1);

try {
serializer.serialize(artifactId, new RecordHeaders(), person);
Assertions.fail();
} catch (Exception ignored) {
}

serializer.setValidationEnabled(false); // disable validation
// create invalid person bytes
bytes = serializer.serialize(artifactId, headers, person);

try {
deserializer.deserialize(artifactId, headers, bytes);
Assertions.fail();
} catch (Exception ignored) {
}
}
}

@Test
public void testJsonSchemaSerdeHeaders() throws Exception {
InputStream jsonSchema = getClass().getResourceAsStream("/io/apicurio/registry/util/json-schema.json");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,23 @@ private Optional<SchemaLookupResult<S>> getSchemaFromCache(ArtifactReference art

private SchemaLookupResult<S> getSchemaFromRegistry(ParsedSchema<S> parsedSchema, Record<T> data, ArtifactReference artifactReference) {

if (autoCreateArtifact && schemaParser.supportsExtractSchemaFromData()) {
if (parsedSchema == null) {
parsedSchema = schemaParser.getSchemaFromData(data, dereference);
}

if (parsedSchema.hasReferences()) {
//List of references lookup, to be used to create the references for the artifact
final List<SchemaLookupResult<S>> schemaLookupResults = handleArtifactReferences(data, parsedSchema);
return handleAutoCreateArtifact(parsedSchema, artifactReference, schemaLookupResults);
} else {
if (autoCreateArtifact) {

if (schemaParser.supportsExtractSchemaFromData()) {

if (parsedSchema == null) {
parsedSchema = schemaParser.getSchemaFromData(data, dereference);
}

if (parsedSchema.hasReferences()) {
//List of references lookup, to be used to create the references for the artifact
final List<SchemaLookupResult<S>> schemaLookupResults = handleArtifactReferences(data, parsedSchema);
return handleAutoCreateArtifact(parsedSchema, artifactReference, schemaLookupResults);
} else {
return handleAutoCreateArtifact(parsedSchema, artifactReference);
}
} else if (config.getExplicitSchemaLocation() != null && schemaParser.supportsGetSchemaFromLocation()) {
parsedSchema = schemaParser.getSchemaFromLocation(config.getExplicitSchemaLocation());
return handleAutoCreateArtifact(parsedSchema, artifactReference);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,27 @@ public interface SchemaParser<S, U> {
*/
ParsedSchema<S> getSchemaFromData(Record<U> data, boolean dereference);

/**
* In some artifact types, such as Json, we allow defining a local place for the schema.
*
* @param location the schema location
* @return the ParsedSchema, containing both the raw schema (bytes) and the parsed schema. Can be null.
*/
default ParsedSchema<S> getSchemaFromLocation(String location) {
return null;
}

/**
* Flag that indicates if {@link SchemaParser#getSchemaFromData(Record)} is implemented or not.
*/
default boolean supportsExtractSchemaFromData() {
return true;
}

/**
* Flag that indicates if {@link SchemaParser#getSchemaFromLocation(String)} is implemented or not.
*/
default boolean supportsGetSchemaFromLocation() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ public class SchemaResolverConfig {
*/
public static final String EXPLICIT_ARTIFACT_ID = "apicurio.registry.artifact.artifact-id";

/**
* Only applicable for serializers
* Optional, set explicitly the schema location in the classpath for the schema to be used for serializing the data.
*/
public static final String SCHEMA_LOCATION = "apicurio.registry.artifact.schema.location";

/**
* Only applicable for serializers
* Optional, set explicitly the version used for querying/creating an artifact.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ public String getExplicitArtifactId() {
return getString(EXPLICIT_ARTIFACT_ID);
}

public String getExplicitSchemaLocation() {
return getString(SCHEMA_LOCATION);
}

public String getExplicitArtifactVersion() {
return getString(EXPLICIT_ARTIFACT_VERSION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ public SchemaParser<JsonSchema, T> schemaParser() {
*/
@Override
protected void serializeData(ParsedSchema<JsonSchema> schema, T data, OutputStream out) throws IOException {
//TODO add property to specify a jsonschema to allow for auto-register json schemas
serializeData(null, schema, data, out);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
public class JsonSchemaKafkaSerializerConfig extends BaseKafkaSerDeConfig {

private static ConfigDef configDef() {
ConfigDef configDef = new ConfigDef()
return new ConfigDef()
.define(VALIDATION_ENABLED, Type.BOOLEAN, VALIDATION_ENABLED_DEFAULT, Importance.MEDIUM, "Whether to validate the data against the json schema");
return configDef;
}

/**
Expand All @@ -30,5 +29,4 @@ public JsonSchemaKafkaSerializerConfig(Map<?, ?> originals) {
public boolean validationEnabled() {
return this.getBoolean(VALIDATION_ENABLED);
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.apicurio.registry.serde.jsonschema;

import io.apicurio.registry.resolver.ParsedSchema;
import io.apicurio.registry.resolver.ParsedSchemaImpl;
import io.apicurio.registry.resolver.SchemaParser;
import io.apicurio.registry.resolver.data.Record;
import io.apicurio.registry.types.ArtifactType;
Expand Down Expand Up @@ -28,11 +29,6 @@ public JsonSchema parseSchema(byte[] rawSchema, Map<String, ParsedSchema<JsonSch
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getParsedSchema())), 0);
}

//TODO we could implement some way of providing the jsonschema beforehand:
// - via annotation in the object being serialized
// - via config property
//if we do this users will be able to automatically registering the schema when using this serde

/**
* @see io.apicurio.registry.resolver.SchemaParser#getSchemaFromData(java.lang.Object)
*/
Expand All @@ -48,8 +44,22 @@ public ParsedSchema<JsonSchema> getSchemaFromData(Record<T> data, boolean derefe
return null;
}

@Override
public ParsedSchema<JsonSchema> getSchemaFromLocation(String location) {
String rawSchema = IoUtil.toString(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));

return new ParsedSchemaImpl<JsonSchema>()
.setParsedSchema(new JsonSchema(rawSchema))
.setRawSchema(rawSchema.getBytes());
}

@Override
public boolean supportsExtractSchemaFromData() {
return false;
}

@Override
public boolean supportsGetSchemaFromLocation() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ public class SerdeConfig {
*/
public static final String EXPLICIT_ARTIFACT_ID = SchemaResolverConfig.EXPLICIT_ARTIFACT_ID;

/**
* Only applicable for serializers
* Optional, set explicitly the schema used for serialization.
*/
public static final String SCHEMA_LOCATION = SchemaResolverConfig.SCHEMA_LOCATION;

/**
* Only applicable for serializers
* Optional, set explicitly the version used for querying/creating an artifact.
Expand Down

0 comments on commit 627a9ee

Please sign in to comment.