Apache Avro 序列化与反序列化 (Java 实现)

像两个交流一样要找一个互相能理解的语言, 在国内为普通话, 跑国外多用英语相通, 两个进程间通信也需要找一个大家都能理解的数据格式. 简单的如 JSON, XML, 那是自我描述性格式, XML 有 Schema 定义, 但尚无正式的 JSON Schema 规范. 在讲求效率的场合, 纯文本式的数据交换格式无法满足要求, 于是有二进制的 Google Protobuf 和 Apache Avro. 在 Apache 的生态像 Hadoop, Kafka 中自然是选用 Avro.

Avro 支持多种语言, 如 C, C++, C#, Java, PHP, Python 和 Ruby. 它使用 JSON 来定义 Schema, 通过工具可以由 Schema 生成相应语言的数据对象, 比如 Java 的  avro-tools.jar. 这样可以在跨进程跨语言透明的实现为对象交换.

本文体验 Java 环境中 Avro 数据格式的序列化与反序列化.

Avro Schema 文件就是数据生产和消费端的通信协议; 我们可以由 Schema 生成相应的 Java 对象, 然后以具体的 Java 对象交换, 或者不生成 Java 对象而纯粹以 GenericRecord 交互. 为操作数据的简单, 我们通常采用前一种方式, 即生成具体数据传输对象.

首先定义一个 Schema

{
  "namespace": "cc.unmi.data",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "address", "type": ["string", "null"]}
  ]
}

对于 Schema 不多加说明, 这里只定义了一个 User 对象, 有两个属性 name 和  address. Schema 的详细解释可打开 http://avro.apache.org/docs/1.8.1/spec.html.

假设文件名为 user.avsc, avsc 应该是 Avro Schema 文件, 我至今都未查到 Avro 是什么的缩写.

由 Schema 生成 Java 对象

我们需要用到 avro-tools-1.x.x.jar 工具包, 当前版本是 1.8.1, 命令格式是

java -jar /path/to/avro-tools-1.8.1.jar compile schema user.avsc .

上面命令会在当前目录生成 cc/unmi/data/User.java 文件. 下面的例子会使用 org.apache.avro:avro-maven-plugin 来从 Schema 生成 Java 对象.

可以大致看一下生成的 User.java 的片断

@org.apache.avro.specific.AvroGenerated
public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  private static final long serialVersionUID = 3019453098083125873L;
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\"....");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
............

生成的对象中包含完整的 Schema 定义内容, 可由静态方法 getClassSchema() 和实例方法 getSchema() 获得相应的 Schema, 所以拥有了这个对象类通信时就不再需要 user.avsc 文件了. 在它的父类 SpecificRecordBase 类中定义了抽象方法 getSchema().

并且这个类提供了多种方式来创建一个实例

  1. User user = new User(); user.setName("Yanbin")..., user.put(2, "Chicago")..., user.put("name", "Qiu")
  2. User user = new User("Yanbin", "Chicago")
  3. User user = User.newBuilder().setName("Yanbin").setAddress("Chicago").build()

序列化

下面的代码把一个 User 对象序列化为字节数组, 也可以序列化为外部文件

private static byte[] serializeUser(User user) throws IOException {
    DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    BinaryEncoder binaryEncoder = EncoderFactory.get().directBinaryEncoder(outputStream, null);
    userDatumWriter.write(user, binaryEncoder);
    return outputStream.toByteArray();
}

光有序列化代码无法验证序列化后的数据是否正确, 于是要有下面的反序列化代码

反序列化

private static User deserializeUser(byte[] data) throws IOException {
    DatumReader<User> userDatumReader = new SpecificDatumReader<>(User.class);
    BinaryDecoder binaryEncoder = DecoderFactory.get().directBinaryDecoder(new ByteArrayInputStream(data), null);
    return userDatumReader.read(new User(), binaryEncoder);
}

从上面方法输出的字节数组中反序列化出相等的对象来,  userDatumReader.read(new User(), binaryEncoder) 执行后的返回值与被更新后的第一个参数是一样的, 所以这个方法要是能写成 reutnr userDatumReader.read(User.class, binaryEncoder); 会好看些.

有了上面的两个方法需要串联起来, 序列化的输出作为反序化的输出就能能证明两个操作是否正确

验证序列化与反序列化

public static void main(String[] args) throws IOException {
    User originalUser =  new User("Yanbin", "Chicago");
    User deserializedUser = deserializeUser(serializeUser(originalUser));
    System.out.println("Same object? " + (deserializedUser == originalUser));
    System.out.println("Objects equal? " + (deserializedUser.equals(originalUser)));
    System.out.println("All fields: " + deserializedUser);
}

执行输出结果如下

Same object? false
Objects equal? true
All fields: {"name": "Yanbin", "address": "Chicago"}

准确无误, 大功告成

本例实作是一个 Maven  项目, pom.xml 文件内容如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cc.unmi</groupId>
    <artifactId>avrodemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>Apache Avro Demo</name>

    <dependencies>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

user.avsc 生成 Java 对象是挂在 generate-sources 阶段执行的, 所以在 mvn compile 时会生成 User.java 文件.

完整的项目文件在 GitHub 上 https://github.com/yabqiu/apache-avro-demo.

相关链接:

  1. Apache Avro™ 1.8.1 Getting Started (Java)
  2. Avro序列化方法
  3. Apache Avro使用入指南

类别: Java/JEE. 标签: . 阅读(104). 订阅评论. TrackBack.

Leave a Reply

Be the First to Comment!

avatar
wpDiscuz