本文共 6828 字,大约阅读时间需要 22 分钟。
Compressor类负责处理消息的压缩功能。在Kafka中,生产者在发送消息时可能会对消息进行压缩,以减少网络传输的负担。Compressor类的构造函数负责初始化相关参数和设置,包括压缩类型、初始位置、记录数、写入大小、压缩率等。
public Compressor(ByteBuffer buffer, CompressionType type) { this.type = type; this.initPos = buffer.position(); this.numRecords = 0; this.writtenUncompressed = 0; this.compressionRate = 1; this.maxTimestamp = Record.NO_TIMESTAMP; if (type != CompressionType.NONE) { buffer.position(initPos + Records.LOG_OVERHEAD + Record.RECORD_OVERHEAD); } bufferStream = new ByteBufferOutputStream(buffer); appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);} public void close() { try { appendStream.close(); } catch (IOException e) { throw new KafkaException(e); } if (type != CompressionType.NONE) { ByteBuffer buffer = bufferStream.buffer(); int pos = buffer.position(); buffer.position(initPos); buffer.putLong(numRecords - 1); buffer.putInt(pos - initPos - Records.LOG_OVERHEAD); Record.write(buffer, maxTimestamp, null, null, type, 0, -1); int valueSize = pos - initPos - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD; buffer.putInt(initPos + Records.LOG_OVERHEAD + Record.KEY_OFFSET_V1, valueSize); long crc = Record.computeChecksum(buffer, initPos + Records.LOG_OVERHEAD + Record.MAGIC_OFFSET, pos - initPos - Records.LOG_OVERHEAD - Record.MAGIC_OFFSET); Utils.writeUnsignedInt(buffer, initPos + Records.LOG_OVERHEAD + Record.CRC_OFFSET, crc); buffer.position(pos); this.compressionRate = (float) buffer.position() / this.writtenUncompressed; TYPE_TO_RATE[type.id] = TYPE_TO_RATE[type.id] * COMPRESSION_RATE_DAMPING_FACTOR + compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR); }} MemoryRecords进行迭代。MemoryRecords的迭代器实现了AbstractIterator接口,包含hasNext()和next()方法。READY、NOT_READY、DONE和FAILED四种状态。public abstract class AbstractIterator implements Iterator { private static enum State { READY, NOT_READY, DONE, FAILED } private State state = State.NOT_READY; private T next; @Override public boolean hasNext() { switch (state) { case DONE: return false; case READY: return true; default: return maybeComputeNext(); } } @Override public T next() { if (!hasNext()) throw new NoSuchElementException(); state = State.NOT_READY; if (next == null) throw new IllegalStateException("Expected item but none found."); return next; } @Override public void remove() { throw new UnsupportedOperationException("Removal not supported"); } public T peek() { if (!hasNext()) throw new NoSuchElementException(); return next; } protected T allDone() { state = State.DONE; return null; } protected abstract T makeNext(); private Boolean maybeComputeNext() { state = State.FAILED; next = makeNext(); if (state == State.DONE) return false; else state = State.READY; return true; }} public static class RecordsIterator extends AbstractIterator { private final ByteBuffer buffer; private final DataInputStream stream; private final CompressionType type; private final boolean shallow; private RecordsIterator innerIter; private final ArrayDeque logEntries; private final long absoluteBaseOffset; public RecordsIterator(LogEntry entry) { this.type = entry.record().compressionType(); this.buffer = entry.record().value(); this.shallow = true; this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic()); long wrapperRecordOffset = entry.offset(); if (entry.record().magic() > Record.MAGIC_VALUE_V0) { this.logEntries = new ArrayDeque<>(); while (true) { LogEntry logEntry = getNextEntryFromStream(); if (logEntry == null) break; long wrapperRecordTimestamp = entry.record().timestamp(); Record recordWithTimestamp = new Record(logEntry.record().buffer(), wrapperRecordTimestamp, entry.record().timestampType()); logEntries.add(new LogEntry(logEntry.offset(), recordWithTimestamp)); } this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset(); } else { this.logEntries = null; this.absoluteBaseOffset = -1; } } public LogEntry makeNext() { if (innerDone()) { try { LogEntry entry = getNextEntry(); if (entry == null) return allDone(); if (absoluteBaseOffset >= 0) { long absoluteOffset = absoluteBaseOffset + entry.offset(); entry = new LogEntry(absoluteOffset, entry.record()); } CompressionType compression = entry.record().compressionType(); if (compression == CompressionType.NONE || shallow) { return entry; } else { innerIter = new RecordsIterator(entry); return innerIter.next(); } } catch (EOFException e) { return allDone(); } catch (IOException e) { throw new KafkaException(e); } } else { return innerIter.next(); } } private LogEntry getNextEntryFromStream() throws IOException { long offset = stream.readLong(); int size = stream.readInt(); if (size < 0) throw new IllegalStateException("Record with size " + size); ByteBuffer rec; if (type == CompressionType.NONE) { rec = buffer.slice(); int newPos = buffer.position() + size; if (newPos > buffer.limit()) return null; buffer.position(newPos); rec.limit(size); } else { byte[] recordBuffer = new byte[size]; stream.readFully(recordBuffer, 0, size); rec = ByteBuffer.wrap(recordBuffer); } return new LogEntry(offset, new Record(rec)); } 浅层迭代
next()方法。makeNext方法中,判断是否完成深层迭代,若未完成则调用getNextEntryFromStream获取消息。深层迭代
getNextEntryFromStream解压并读取内层消息。消息返回
shallow参数决定是否返回浅层消息或深层解压消息。通过上述机制,Kafka实现了高效的消息压缩与解压功能,确保了消息传输的高效性和可靠性。
转载地址:http://ejxx.baihongyu.com/