博客
关于我
kafka日志存储(三):Compressor
阅读量:253 次
发布时间:2019-03-01

本文共 6828 字,大约阅读时间需要 22 分钟。

Kafka消息压缩与迭代机制解析

1. Compressor类简介

Compressor类负责处理消息的压缩功能。在Kafka中,生产者在发送消息时可能会对消息进行压缩,以减少网络传输的负担。Compressor类的构造函数负责初始化相关参数和设置,包括压缩类型、初始位置、记录数、写入大小、压缩率等。

构造函数分析

public Compressor(ByteBuffer buffer, CompressionType type) {
this.type = type;
this.initPos = buffer.position();
this.numRecords = 0;
this.writtenUncompressed = 0;
this.compressionRate = 1;
this.maxTimestamp = Record.NO_TIMESTAMP;
if (type != CompressionType.NONE) {
buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD);
}
bufferStream = new ByteBufferOutputStream(buffer);
appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}

关闭方法

public void close() {
try {
appendStream.close();
} catch (IOException e) {
throw new KafkaException(e);
}
if (type != CompressionType.NONE) {
ByteBuffer buffer = bufferStream.buffer();
int pos = buffer.position();
buffer.position(initPos);
buffer.putLong(numRecords - 1);
buffer.putInt(pos - initPos - Records.LOG_OVERHEAD);
Record.write(buffer, maxTimestamp, null, null, type, 0, -1);
int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD;
buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET_V1, valueSize);
long crc = Record.computeChecksum(buffer,
initPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET,
pos - initPos - Records.LOG_OVERHEAD - Record.MAGIC_OFFSET);
Utils.writeUnsignedInt(buffer, initPos + Records.LOG_OVERHEAD + Record.CRC_OFFSET, crc);
buffer.position(pos);
this.compressionRate = (float) buffer.position() / this.writtenUncompressed;
TYPE_TO_RATE[type.id] = TYPE_TO_RATE[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
}
}

2. 消费者解压逻辑

offset分配机制

  • 生产者创建消息时,设置的offset是内部的绝对值或相对值。
  • 服务端根据外层信息中的内存压缩消息个数,分配外层offset。
  • 外层消息的offset是内层消息中最后一个消息的绝对值。

解压过程

  • 消费者获取压缩后的消息后,需要对MemoryRecords进行迭代。
  • MemoryRecords的迭代器实现了AbstractIterator接口,包含hasNext()next()方法。
  • 迭代器的状态机包含READYNOT_READYDONEFAILED四种状态。
  • 3. 迭代器实现

    AbstractIterator

    public abstract class AbstractIterator implements Iterator {
    private static enum State { READY, NOT_READY, DONE, FAILED }
    private State state = State.NOT_READY;
    private T next;
    @Override
    public boolean hasNext() {
    switch (state) {
    case DONE: return false;
    case READY: return true;
    default: return maybeComputeNext();
    }
    }
    @Override
    public T next() {
    if (!hasNext()) throw new NoSuchElementException();
    state = State.NOT_READY;
    if (next == null) throw new IllegalStateException("Expected item but none found.");
    return next;
    }
    @Override
    public void remove() { throw new UnsupportedOperationException("Removal not supported"); }
    public T peek() { if (!hasNext()) throw new NoSuchElementException(); return next; }
    protected T allDone() { state = State.DONE; return null; }
    protected abstract T makeNext();
    private Boolean maybeComputeNext() {
    state = State.FAILED;
    next = makeNext();
    if (state == State.DONE) return false;
    else state = State.READY;
    return true;
    }
    }

    RecordsIterator

    public static class RecordsIterator extends AbstractIterator {
    private final ByteBuffer buffer;
    private final DataInputStream stream;
    private final CompressionType type;
    private final boolean shallow;
    private RecordsIterator innerIter;
    private final ArrayDeque logEntries;
    private final long absoluteBaseOffset;
    public RecordsIterator(LogEntry entry) {
    this.type = entry.record().compressionType();
    this.buffer = entry.record().value();
    this.shallow = true;
    this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
    long wrapperRecordOffset = entry.offset();
    if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
    this.logEntries = new ArrayDeque<>();
    while (true) {
    LogEntry logEntry = getNextEntryFromStream();
    if (logEntry == null) break;
    long wrapperRecordTimestamp = entry.record().timestamp();
    Record recordWithTimestamp = new Record(logEntry.record().buffer(), wrapperRecordTimestamp, entry.record().timestampType());
    logEntries.add(new LogEntry(logEntry.offset(), recordWithTimestamp));
    }
    this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
    } else {
    this.logEntries = null;
    this.absoluteBaseOffset = -1;
    }
    }
    public LogEntry makeNext() {
    if (innerDone()) {
    try {
    LogEntry entry = getNextEntry();
    if (entry == null) return allDone();
    if (absoluteBaseOffset >= 0) {
    long absoluteOffset = absoluteBaseOffset + entry.offset();
    entry = new LogEntry(absoluteOffset, entry.record());
    }
    CompressionType compression = entry.record().compressionType();
    if (compression == CompressionType.NONE || shallow) {
    return entry;
    } else {
    innerIter = new RecordsIterator(entry);
    return innerIter.next();
    }
    } catch (EOFException e) {
    return allDone();
    } catch (IOException e) {
    throw new KafkaException(e);
    }
    } else {
    return innerIter.next();
    }
    }
    private LogEntry getNextEntryFromStream() throws IOException {
    long offset = stream.readLong();
    int size = stream.readInt();
    if (size < 0) throw new IllegalStateException("Record with size " + size);
    ByteBuffer rec;
    if (type == CompressionType.NONE) {
    rec = buffer.slice();
    int newPos = buffer.position() + size;
    if (newPos > buffer.limit()) return null;
    buffer.position(newPos);
    rec.limit(size);
    } else {
    byte[] recordBuffer = new byte[size];
    stream.readFully(recordBuffer, 0, size);
    rec = ByteBuffer.wrap(recordBuffer);
    }
    return new LogEntry(offset, new Record(rec));
    }

    4. 迭代过程解析

  • 浅层迭代

    • 创建浅层迭代器,调用next()方法。
    • makeNext方法中,判断是否完成深层迭代,若未完成则调用getNextEntryFromStream获取消息。
  • 深层迭代

    • 检测消息的压缩格式,若压缩则创建深层迭代器。
    • 内部迭代器通过getNextEntryFromStream解压并读取内层消息。
  • 消息返回

    • 根据shallow参数决定是否返回浅层消息或深层解压消息。
  • 通过上述机制,Kafka实现了高效的消息压缩与解压功能,确保了消息传输的高效性和可靠性。

    转载地址:http://ejxx.baihongyu.com/

    你可能感兴趣的文章
    npm scripts 使用指南
    查看>>
    npm should be run outside of the node repl, in your normal shell
    查看>>
    npm start运行了什么
    查看>>
    npm WARN deprecated core-js@2.6.12 core-js@<3.3 is no longer maintained and not recommended for usa
    查看>>
    npm 下载依赖慢的解决方案(亲测有效)
    查看>>
    npm 安装依赖过程中报错:Error: Can‘t find Python executable “python“, you can set the PYTHON env variable
    查看>>
    npm.taobao.org 淘宝 npm 镜像证书过期?这样解决!
    查看>>
    npm—小记
    查看>>
    npm介绍以及常用命令
    查看>>
    NPM使用前设置和升级
    查看>>
    npm入门,这篇就够了
    查看>>
    npm切换到淘宝源
    查看>>
    npm切换源淘宝源的两种方法
    查看>>
    npm前端包管理工具简介---npm工作笔记001
    查看>>
    npm包管理深度探索:从基础到进阶全面教程!
    查看>>
    npm升级以及使用淘宝npm镜像
    查看>>
    npm发布包--所遇到的问题
    查看>>
    npm发布自己的组件UI包(详细步骤,图文并茂)
    查看>>
    npm和yarn清理缓存命令
    查看>>
    npm和yarn的使用对比
    查看>>