Skip to content

Latest commit

 

History

History
99 lines (72 loc) · 6.6 KB

avro.md

File metadata and controls

99 lines (72 loc) · 6.6 KB

Avro Support in Dagli

Dagli supports convenient reading and manipulation of Avro objects. There are two scenarios:

  • You know the Avro objects' schema(s) at compile time
  • You want to read data with arbitrary schemas not known at compile time

Schema is known at compile time

When the schema is known at compile time, Avro can generate a Java class that represents your schema. If your build tools do not support automating this, you can generate the Java class "manually" using Avro's CLI.

Now, to read the Avro objects, just use AvroReader, defined in the objectio/avro module; in this example, we'll assume that the compiled schema is the User class:

try (AvroReader<User> reader = new AvroReader<>(User.class, avroPath)) {
	try (ObjectIterator<User> iter = reader.iterator()) {
		while (iter.hasNext()) {
			User user = iter.next();
			// do something with user
		}
	}
}

Because AvroReader is also an ObjectReader, which you can pass directly to Dagli as input data for training your model, e.g.

try (AvroReader<User> userData = new AvroReader<>(User.class, path)) {
	DAG1x1.Prepared<User, String> myPreparedDAG = myDAG.prepare(userData);
}

Manipulating Avro Objects in your DAG

So now you're able to feed an Avro object--in our running example, User--into your DAG, but how do you access specific fields? While you could of course create your own transformers to do this, that's a pain. So instead you'll want to create a @Struct from your Avro class, since @Structs are classes generated by Dagli that automatically come with field access transformers (and other stuff, too, like a transformer that can create a new instance given its fields). @Structs require the Dagli annotation processor (as explained by the @Struct documentation), but once that's set up all we need to do is create a @Struct definition class like so:

@Struct("UserStruct")
class UserStructBase extends User { }

This will create a new @Struct called UserStruct that extends the Avro-generated User class. UserStruct can now be used instead of User. Here's a simple example that creates a DAG that pulls out the name field from User objects, then applies it to an Avro file to get all the resultant names:

Placeholder<UserStruct> placeholder = new Placeholder<>();
UserStruct.Name name = new UserStruct.Name().withInput(placeholder);

try (AvroReader<UserStruct> userData = new AvroReader<>(UserStruct.class, avroPath)) {
	Result<CharSequence> names = DAG.Prepared.withPlaceholder(placeholder).withOutput(name).applyAll(userData);
	// do something with the names
}

Schema is not known at compile time

Just read your Avro records as GenericRecords, like so:

try (AvroReader<GenericRecord> reader = new AvroReader<>(GenericRecord.class, avroPath)) {
	try (ObjectIterator<GenericRecord> iter = reader.iterator()) {
		while (iter.hasNext()) {
			GenericRecord record = iter.next();
			// do something with record
		}
	}
}

Notice this is just like our previous example where we had a pre-generated User class, except User has been replaced by GenericRecord.

Accessing GenericRecord Fields in your DAG

As before, you could simply write your own transformers, but Dagli also makes available (in the dagli/avro module) a convenient, generic way to access your GenericRecord fields; here is an example that extracts the "name" field from some generic Avro objects:

Placeholder<GenericRecord> placeholder = new Placeholder<>();
AvroField<CharSequence> name = new AvroField<>().withFieldType(CharSequence.class).withFieldName("name").withInput(placeholder);

try (AvroReader<GenericRecord> records = new AvroReader<>(GenericRecord.class, avroPath)) {
	Result<CharSequence> names = DAG.Prepared.withPlaceholder(placeholder).withOutput(name).applyAll(records);
	// do something with the names
}

Here, we define an AvroField transformer that will pull out a field called "name" of type CharSequence (note that Avro, by default, stores text fields as Utf8 [implementing CharSequence] rather than String). The type of the field will be verified at run-time and an exception thrown if "name" is not really of type CharSequence (nulls are fine).

Writing Avro

Avro files can be written using AvroWriter, an implementation of the ObjectWriter interface that writes Avro records, either specific record types generated from an Avro schema (as discussed above) or generic records.

Example

Writing an Avro file is easy: just create the writer, passing either the specific record class (or GenericRecord.class) and start writing records. When you're done, close the writer. Records can be read back using the reader obtained by calling avroWriter.createReader() or creating a new AvroReader to read the file.

AvroWriter<User> avroWriter = new AvroWriter<>(User.class, avroPath);
avroWriter.write(someUserRecord);
avroWriter.write(anotherUserRecord);
...
avroWriter.close(); // the writer must be closed before any reading takes place

// now we can get the reader to read back the written records
AvroReader<User> avroReader1 = avroWriter.createReader();

// or create the reader directly from the file (the readers obtained via either method are equivalent)
AvroReader<User> avroReader2 = new AvroReader(User.class, avroPath);

Appending to Existing Files

AvroWriters can append to existing Avro files (of the same record type, of course). This happens automatically if the path passed to the constructor already exists (for example, if, in the example above, avroPath already exists, someUserRecord and anotherUserRecord will be appended to the records already extant in that file.)

Specifying the Record Count

When creating an AvroWriter, it's possible to provide a third argument with the number of records you will write, like so:

AvroWriter<User> avroWriter = new AvroWriter<>(User.class, avroPath, 1000);

The record count is written to a special metadata field in the Avro file that provides a very fast way for subsequent AvroReaderinstances to determine the number of records in the file (without this metadata, it must scan through the entire file to count them). Due to the limitations of Avro, however, this comes with a number of restrictions:

  • An exception will be thrown if you try to specify a record count when appending to an existing file.
  • You must write exactly this many records before closing the writer, or an exception will be thrown.
  • You will not be able to append to this file later.