Pull out buffer-chunk logic into separate class

This commit is contained in:
topjohnwu 2021-11-21 02:27:52 -08:00
parent 9ea3169ca9
commit d8b9265484
3 changed files with 126 additions and 105 deletions

View File

@ -476,142 +476,90 @@ private:
} }
}; };
class LZ4_decoder : public cpr_stream { class buf_cpr_stream : public chunk_out_stream {
public:
using chunk_out_stream::chunk_out_stream;
ssize_t writeFully(void *buf, size_t len) override {
return write(buf, len);
}
};
class LZ4_decoder : public buf_cpr_stream {
public: public:
explicit LZ4_decoder(stream_ptr &&base) : explicit LZ4_decoder(stream_ptr &&base) :
cpr_stream(std::move(base)), out_buf(new char[LZ4_UNCOMPRESSED]), buf_cpr_stream(std::move(base), LZ4_COMPRESSED, sizeof(block_sz) + 4),
buf(new char[LZ4_COMPRESSED]), init(false), block_sz(0), buf_off(0) {} out_buf(new char[LZ4_UNCOMPRESSED]), block_sz(0) {}
~LZ4_decoder() override { ~LZ4_decoder() override {
close();
delete[] out_buf; delete[] out_buf;
delete[] buf;
} }
ssize_t write(const void *in, size_t size) override { protected:
size_t ret = 0; ssize_t write_chunk(const void *buf, size_t len) override {
auto inbuf = static_cast<const char *>(in); // This is an error
if (!init) { if (len != chunk_sz)
// Skip magic return -1;
inbuf += 4;
size -= 4; auto in = reinterpret_cast<const char *>(buf);
init = true;
}
for (size_t consumed; size != 0;) {
if (block_sz == 0) { if (block_sz == 0) {
if (buf_off + size >= sizeof(block_sz)) { if (chunk_sz == sizeof(block_sz) + 4) {
consumed = sizeof(block_sz) - buf_off; // Skip the first 4 bytes, which is magic
memcpy(buf + buf_off, inbuf, consumed); memcpy(&block_sz, in + 4, sizeof(block_sz));
memcpy(&block_sz, buf, sizeof(block_sz));
buf_off = 0;
} else { } else {
consumed = size; memcpy(&block_sz, in, sizeof(block_sz));
memcpy(buf + buf_off, inbuf, size);
} }
inbuf += consumed; chunk_sz = block_sz;
size -= consumed; return 0;
} else if (buf_off + size >= block_sz) { } else {
consumed = block_sz - buf_off; int r = LZ4_decompress_safe(in, out_buf, block_sz, LZ4_UNCOMPRESSED);
memcpy(buf + buf_off, inbuf, consumed); chunk_sz = sizeof(block_sz);
inbuf += consumed; block_sz = 0;
size -= consumed; if (r < 0) {
LOGW("LZ4HC decompression failure (%d)\n", r);
int write = LZ4_decompress_safe(buf, out_buf, block_sz, LZ4_UNCOMPRESSED);
if (write < 0) {
LOGW("LZ4HC decompression failure (%d)\n", write);
return -1; return -1;
} }
ret += bwrite(out_buf, write); return bwrite(out_buf, r);
// Reset
buf_off = 0;
block_sz = 0;
} else {
// Copy to internal buffer
memcpy(buf + buf_off, inbuf, size);
buf_off += size;
break;
} }
} }
return ret;
}
private: private:
char *out_buf; char *out_buf;
char *buf; uint32_t block_sz;
bool init;
unsigned block_sz;
int buf_off;
}; };
class LZ4_encoder : public cpr_stream { class LZ4_encoder : public buf_cpr_stream {
public: public:
explicit LZ4_encoder(stream_ptr &&base, bool lg) : explicit LZ4_encoder(stream_ptr &&base, bool lg) :
cpr_stream(std::move(base)), outbuf(new char[LZ4_COMPRESSED]), buf_cpr_stream(std::move(base), LZ4_UNCOMPRESSED),
buf(new char[LZ4_UNCOMPRESSED]), init(false), lg(lg), buf_off(0), in_total(0) {} out_buf(new char[LZ4_COMPRESSED]), lg(lg), in_total(0) {
bwrite("\x02\x21\x4c\x18", 4);
ssize_t write(const void *in, size_t size) override {
size_t ret = 0;
if (!init) {
ret += bwrite("\x02\x21\x4c\x18", 4);
init = true;
}
if (size == 0)
return 0;
in_total += size;
const char *inbuf = (const char *) in;
size_t consumed;
do {
if (buf_off + size >= LZ4_UNCOMPRESSED) {
consumed = LZ4_UNCOMPRESSED - buf_off;
memcpy(buf + buf_off, inbuf, consumed);
inbuf += consumed;
size -= consumed;
buf_off = LZ4_UNCOMPRESSED;
if (int written = write_block(); written < 0)
return -1;
else
ret += written;
// Reset buffer
buf_off = 0;
} else {
// Copy to internal buffer
memcpy(buf + buf_off, inbuf, size);
buf_off += size;
size = 0;
}
} while (size != 0);
return ret;
} }
~LZ4_encoder() override { ~LZ4_encoder() override {
if (buf_off) close();
write_block();
if (lg) if (lg)
bwrite(&in_total, sizeof(in_total)); bwrite(&in_total, sizeof(in_total));
delete[] outbuf; delete[] out_buf;
delete[] buf;
} }
private: protected:
char *outbuf; ssize_t write_chunk(const void *buf, size_t len) override {
char *buf; int r = LZ4_compress_HC((const char *) buf, out_buf, len, LZ4_COMPRESSED, LZ4HC_CLEVEL_MAX);
bool init; if (r == 0) {
bool lg;
int buf_off;
unsigned in_total;
int write_block() {
int written = LZ4_compress_HC(buf, outbuf, buf_off, LZ4_COMPRESSED, LZ4HC_CLEVEL_MAX);
if (written == 0) {
LOGW("LZ4HC compression failure\n"); LOGW("LZ4HC compression failure\n");
return -1; return -1;
} }
bwrite(&written, sizeof(written)); bwrite(&r, sizeof(r));
bwrite(outbuf, written); bwrite(out_buf, r);
return written + sizeof(written); return r + sizeof(r);
} }
private:
char *out_buf;
bool lg;
unsigned in_total;
}; };
stream_ptr get_encoder(format_t type, stream_ptr &&base) { stream_ptr get_encoder(format_t type, stream_ptr &&base) {

View File

@ -35,6 +35,34 @@ protected:
stream_ptr base; stream_ptr base;
}; };
// Buffered output stream, writing in chunks
class chunk_out_stream : public filter_stream {
public:
chunk_out_stream(stream_ptr &&base, size_t buf_sz, size_t chunk_sz)
: filter_stream(std::move(base)), chunk_sz(chunk_sz), buf_sz(buf_sz) {}
chunk_out_stream(stream_ptr &&base, size_t buf_sz = 4096)
: chunk_out_stream(std::move(base), buf_sz, buf_sz) {}
~chunk_out_stream() { delete[] _buf; }
// Reading does not make sense
ssize_t read(void *buf, size_t len) final { return stream::read(buf, len); }
ssize_t write(const void *buf, size_t len) final;
protected:
// Classes inheriting this class has to call close() in the destructor
void close();
virtual ssize_t write_chunk(const void *buf, size_t len) = 0;
size_t chunk_sz;
private:
size_t buf_sz;
size_t buf_off = 0;
uint8_t *_buf = nullptr;
};
// Byte stream that dynamically allocates memory // Byte stream that dynamically allocates memory
class byte_stream : public stream { class byte_stream : public stream {
public: public:

View File

@ -121,6 +121,51 @@ ssize_t filter_stream::write(const void *buf, size_t len) {
return base->write(buf, len); return base->write(buf, len);
} }
ssize_t chunk_out_stream::write(const void *_in, size_t len) {
ssize_t ret = 0;
auto in = static_cast<const uint8_t *>(_in);
while (len) {
if (buf_off + len >= chunk_sz) {
const uint8_t *src;
if (buf_off) {
// Copy the rest of the chunk to internal buffer
src = _buf;
auto copy = chunk_sz - buf_off;
memcpy(_buf + buf_off, in, copy);
in += copy;
len -= copy;
buf_off = 0;
} else {
src = in;
in += chunk_sz;
len -= chunk_sz;
}
auto r = write_chunk(src, chunk_sz);
if (r < 0)
return ret;
ret += r;
} else {
// Buffer internally
if (!_buf) {
_buf = new uint8_t[buf_sz];
}
memcpy(_buf + buf_off, in, len);
buf_off += len;
break;
}
}
return ret;
}
void chunk_out_stream::close() {
if (buf_off) {
write_chunk(_buf, buf_off);
delete[] _buf;
_buf = nullptr;
buf_off = 0;
}
}
byte_stream::byte_stream(uint8_t *&buf, size_t &len) : _buf(buf), _len(len) { byte_stream::byte_stream(uint8_t *&buf, size_t &len) : _buf(buf), _len(len) {
buf = nullptr; buf = nullptr;
len = 0; len = 0;