Skip to content

Commit

Permalink
AVRO-1695: Ruby support for logical types
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Perkins authored and tjwp committed Nov 7, 2018
1 parent a318a34 commit b409d89
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 22 deletions.
2 changes: 2 additions & 0 deletions lang/ruby/Manifest
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ lib/avro.rb
lib/avro/data_file.rb
lib/avro/io.rb
lib/avro/ipc.rb
lib/avro/logical_types.rb
lib/avro/protocol.rb
lib/avro/schema.rb
lib/avro/schema_compatibility.rb
Expand All @@ -24,6 +25,7 @@ test/test_datafile.rb
test/test_fingerprints.rb
test/test_help.rb
test/test_io.rb
test/test_logical_types.rb
test/test_protocol.rb
test/test_schema.rb
test/test_schema_compatibility.rb
Expand Down
10 changes: 7 additions & 3 deletions lang/ruby/lib/avro/io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def read_data(writers_schema, readers_schema, decoder)

# function dispatch for reading data based on type of writer's
# schema
case writers_schema.type_sym
datum = case writers_schema.type_sym
when :null; decoder.read_null
when :boolean; decoder.read_boolean
when :string; decoder.read_string
Expand All @@ -272,6 +272,8 @@ def read_data(writers_schema, readers_schema, decoder)
else
raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
end

readers_schema.type_adapter.decode(datum)
end

def read_fixed(writers_schema, readers_schema, decoder)
Expand Down Expand Up @@ -499,8 +501,10 @@ def write(datum, encoder)
write_data(writers_schema, datum, encoder)
end

def write_data(writers_schema, datum, encoder)
unless Schema.validate(writers_schema, datum)
def write_data(writers_schema, logical_datum, encoder)
datum = writers_schema.type_adapter.encode(logical_datum)

unless Schema.validate(writers_schema, datum, encoded = true)
raise AvroTypeError.new(writers_schema, datum)
end

Expand Down
84 changes: 84 additions & 0 deletions lang/ruby/lib/avro/logical_types.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'date'

module Avro
module LogicalTypes
module IntDate
EPOCH_START = Date.new(1970, 1, 1)

def self.encode(date)
(date - EPOCH_START).to_i
end

def self.decode(int)
EPOCH_START + int
end
end

module TimestampMillis
def self.encode(value)
time = value.to_time
time.to_i * 1000 + time.usec / 1000
end

def self.decode(int)
s, ms = int / 1000, int % 1000
Time.at(s, ms * 1000).utc
end
end

module TimestampMicros
def self.encode(value)
time = value.to_time
time.to_i * 1000_000 + time.usec
end

def self.decode(int)
s, us = int / 1000_000, int % 1000_000
Time.at(s, us).utc
end
end

module Identity
def self.encode(datum)
datum
end

def self.decode(datum)
datum
end
end

TYPES = {
"int" => {
"date" => IntDate
},
"long" => {
"timestamp-millis" => TimestampMillis,
"timestamp-micros" => TimestampMicros
},
}.freeze

def self.type_adapter(type, logical_type)
return unless logical_type

TYPES.fetch(type, {}).fetch(logical_type, Identity)
end
end
end
38 changes: 25 additions & 13 deletions lang/ruby/lib/avro/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

require 'avro/logical_types'

module Avro
class Schema
# Sets of strings, for backwards compatibility. See below for sets of symbols,
Expand All @@ -40,6 +42,7 @@ def self.parse(json_string)
def self.real_parse(json_obj, names=nil, default_namespace=nil)
if json_obj.is_a? Hash
type = json_obj['type']
logical_type = json_obj['logicalType']
raise SchemaParseError, %Q(No "type" property: #{json_obj}) if type.nil?

# Check that the type is valid before calling #to_sym, since symbols are never garbage
Expand All @@ -50,15 +53,15 @@ def self.real_parse(json_obj, names=nil, default_namespace=nil)

type_sym = type.to_sym
if PRIMITIVE_TYPES_SYM.include?(type_sym)
return PrimitiveSchema.new(type_sym)
return PrimitiveSchema.new(type_sym, logical_type)

elsif NAMED_TYPES_SYM.include? type_sym
name = json_obj['name']
namespace = json_obj.include?('namespace') ? json_obj['namespace'] : default_namespace
case type_sym
when :fixed
size = json_obj['size']
return FixedSchema.new(name, namespace, size, names)
return FixedSchema.new(name, namespace, size, names, logical_type)
when :enum
symbols = json_obj['symbols']
doc = json_obj['doc']
Expand Down Expand Up @@ -93,23 +96,29 @@ def self.real_parse(json_obj, names=nil, default_namespace=nil)
end

# Determine if a ruby datum is an instance of a schema
def self.validate(expected_schema, datum)
SchemaValidator.validate!(expected_schema, datum)
def self.validate(expected_schema, logical_datum, encoded = false)
SchemaValidator.validate!(expected_schema, logical_datum, encoded)
true
rescue SchemaValidator::ValidationError
false
end

def initialize(type)
def initialize(type, logical_type=nil)
@type_sym = type.is_a?(Symbol) ? type : type.to_sym
@logical_type = logical_type
end

attr_reader :type_sym
attr_reader :logical_type

# Returns the type as a string (rather than a symbol), for backwards compatibility.
# Deprecated in favor of {#type_sym}.
def type; @type_sym.to_s; end

def type_adapter
@type_adapter ||= LogicalTypes.type_adapter(type, logical_type) || LogicalTypes::Identity
end

# Returns the MD5 fingerprint of the schema as an Integer.
def md5_fingerprint
parsing_form = SchemaNormalization.to_parsing_form(self)
Expand Down Expand Up @@ -157,7 +166,9 @@ def subparse(json_obj, names=nil, namespace=nil)
end

def to_avro(names=nil)
{'type' => type}
props = {'type' => type}
props['logicalType'] = logical_type if logical_type
props
end

def to_s
Expand All @@ -166,8 +177,9 @@ def to_s

class NamedSchema < Schema
attr_reader :name, :namespace
def initialize(type, name, namespace=nil, names=nil, doc=nil)
super(type)

def initialize(type, name, namespace=nil, names=nil, doc=nil, logical_type=nil)
super(type, logical_type)
@name, @namespace = Name.extract_namespace(name, namespace)
@doc = doc
names = Name.add_name(names, self)
Expand Down Expand Up @@ -318,11 +330,11 @@ def to_avro(names=Set.new)

# Valid primitive types are in PRIMITIVE_TYPES.
class PrimitiveSchema < Schema
def initialize(type)
def initialize(type, logical_type=nil)
if PRIMITIVE_TYPES_SYM.include?(type)
super(type)
super(type, logical_type)
elsif PRIMITIVE_TYPES.include?(type)
super(type.to_sym)
super(type.to_sym, logical_type)
else
raise AvroError.new("#{type} is not a valid primitive type.")
end
Expand All @@ -336,12 +348,12 @@ def to_avro(names=nil)

class FixedSchema < NamedSchema
attr_reader :size
def initialize(name, space, size, names=nil)
def initialize(name, space, size, names=nil, logical_type=nil)
# Ensure valid cto args
unless size.is_a?(Integer)
raise AvroError, 'Fixed Schema requires a valid integer for size property.'
end
super(:fixed, name, space, names)
super(:fixed, name, space, names, logical_type)
@size = size
end

Expand Down
12 changes: 9 additions & 3 deletions lang/ruby/lib/avro/schema_validator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,22 @@ def to_s
TypeMismatchError = Class.new(ValidationError)

class << self
def validate!(expected_schema, datum)
def validate!(expected_schema, logical_datum, encoded = false)
result = Result.new
validate_recursive(expected_schema, datum, ROOT_IDENTIFIER, result)
validate_recursive(expected_schema, logical_datum, ROOT_IDENTIFIER, result, encoded)
fail ValidationError, result if result.failure?
result
end

private

def validate_recursive(expected_schema, datum, path, result)
def validate_recursive(expected_schema, logical_datum, path, result, encoded = false)
datum = if encoded
logical_datum
else
expected_schema.type_adapter.encode(logical_datum) rescue nil
end

case expected_schema.type_sym
when :null
fail TypeMismatchError unless datum.nil?
Expand Down
23 changes: 21 additions & 2 deletions lang/ruby/test/random_data.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ def next
end

def nextdata(schm, d=0)
return logical_nextdata(schm, d=0) unless schm.type_adapter.eql?(Avro::LogicalTypes::Identity)

case schm.type_sym
when :boolean
rand > 0.5
when :string
randstr()
when :int
rand(Avro::Schema::INT_MAX_VALUE - Avro::Schema::INT_MIN_VALUE) + Avro::Schema::INT_MIN_VALUE
rand_int
when :long
rand(Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) + Avro::Schema::LONG_MIN_VALUE
rand_long
when :float
(-1024 + 2048 * rand).round.to_f
when :double
Expand Down Expand Up @@ -79,6 +81,15 @@ def nextdata(schm, d=0)
end
end

def logical_nextdata(schm, _d=0)
case schm.logical_type
when 'date'
Avro::LogicalTypes::IntDate.decode(rand_int)
when 'timestamp-millis', 'timestamp-micros'
Avro::LogicalTypes::TimestampMicros.decode(rand_long)
end
end

CHARPOOL = 'abcdefghjkmnpqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ23456789'
BYTEPOOL = '12345abcd'

Expand All @@ -87,4 +98,12 @@ def randstr(chars=CHARPOOL, length=20)
rand(length+1).times { str << chars[rand(chars.size)] }
str
end

def rand_int
rand(Avro::Schema::INT_MAX_VALUE - Avro::Schema::INT_MIN_VALUE) + Avro::Schema::INT_MIN_VALUE
end

def rand_long
rand(Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) + Avro::Schema::LONG_MIN_VALUE
end
end
14 changes: 13 additions & 1 deletion lang/ruby/test/test_io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ def test_record
check_default(record_schema, '{"f": 11}', {"f" => 11})
end

def test_record_with_logical_type
record_schema = <<EOS
{"type": "record",
"name": "Test",
"fields": [{"name": "ts",
"type": {"type": "long",
"logicalType": "timestamp-micros"}}]}
EOS
check(record_schema)
end

def test_error
error_schema = <<EOS
{"type": "error",
Expand Down Expand Up @@ -115,6 +126,7 @@ def test_recursive
def test_union
union_schema = <<EOS
["string",
{"type": "int", "logicalType": "date"},
"null",
"long",
{"type": "record",
Expand Down Expand Up @@ -451,7 +463,7 @@ def check(str)

def checkser(schm, randomdata)
datum = randomdata.next
assert validate(schm, datum)
assert validate(schm, datum), 'datum is not valid for schema'
w = Avro::IO::DatumWriter.new(schm)
writer = StringIO.new "", "w"
w.write(datum, Avro::IO::BinaryEncoder.new(writer))
Expand Down
Loading

0 comments on commit b409d89

Please sign in to comment.