Skip to content

Commit

Permalink
Handle topics using identical schemas (waldophotos#53)
Browse files Browse the repository at this point in the history
* Add failing test

* Handle topics which use identical schemas
  • Loading branch information
scottwd9 authored and ricardohbin committed Nov 9, 2018
1 parent 898a3e4 commit 487a04a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 2 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ You can use `docker-compose up` to up all the stack before you call your integra

## Release History

- **1.1.2**, *08 November 2018*
- Handle topics which use identical schemas (by [scottwd9](https://github.com/scottwd9))
- **1.1.1**, *23 August 2018*
- Set `schemaMeta` for key schemas also (by [eparreno](https://github.com/eparreno))
- **1.1.0**, *06 August 2018*
Expand Down
20 changes: 18 additions & 2 deletions lib/schema-registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ var SchemaRegistry = module.exports = cip.extend(function(opts) {
this.schemaTypeById = {};
});

/**
* Get the avro RecordType from a schema response.
*
* @return {RecordType} An avro RecordType representing the parsed avro schema.
*/
function typeFromSchemaResponse(schema, parseOptions) {
var schemaType = avro.parse(schema);

//check if the schema has been previouisly parsed and added to the registry
if(typeof parseOptions.registry === 'object' && typeof parseOptions.registry[schemaType.name] !== 'undefined'){
return parseOptions.registry[schemaType.name];
}

return avro.parse(schema, parseOptions);
}

/**
* Initialize the library, fetch schemas and register them locally.
*
Expand Down Expand Up @@ -323,7 +339,7 @@ SchemaRegistry.prototype._registerSchema = Promise.method(function (schemaObj) {
schemaObj.topic);

try {
schemaObj.type = avro.parse(schemaObj.responseRaw.schema, this.parseOptions);
schemaObj.type = typeFromSchemaResponse(schemaObj.responseRaw.schema, this.parseOptions);
} catch(ex) {
log.warn('_registerSchema() :: Error parsing schema:',
schemaObj.schemaTopicRaw, 'Error:', ex.message, 'Moving on...');
Expand Down Expand Up @@ -352,7 +368,7 @@ SchemaRegistry.prototype._registerSchemaLatest = Promise.method(function (schema
schemaObj.topic);

try {
schemaObj.type = avro.parse(schemaObj.responseRaw.schema, this.parseOptions);
schemaObj.type = typeFromSchemaResponse(schemaObj.responseRaw.schema, this.parseOptions);
} catch (ex) {
log.warn('_registerSchemaLatest() :: Error parsing schema:',
schemaObj.schemaTopicRaw, 'Error:', ex.message, 'Moving on...');
Expand Down
2 changes: 2 additions & 0 deletions test/lib/test.lib.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ testLib.KAFKA_BROKER_URL = 'kafka:9092';

testLib.topic = schemaFix.name;
testLib.topicTwo = schemaTwoFix.name;
testLib.topicThreeWithDuplicateSchema = schemaFix.name + '-duplicateSchema';

var testBoot = false;

Expand Down Expand Up @@ -62,6 +63,7 @@ testLib.init = function() {
return Promise.all([
testLib.registerSchema(testLib.topic, schemaFix),
testLib.registerSchema(testLib.topicTwo, schemaTwoFix),
testLib.registerSchema(testLib.topicThreeWithDuplicateSchema, schemaFix),
]);
});

Expand Down

0 comments on commit 487a04a

Please sign in to comment.