From 27f7fa7153734e1d10e84e1542b8a08e115abd20 Mon Sep 17 00:00:00 2001 From: topjohnwu Date: Fri, 13 Aug 2021 02:08:56 -0700 Subject: [PATCH] Extend stream support --- native/jni/core/logging.cpp | 23 ++++--- native/jni/magiskboot/compress.cpp | 39 ++++++----- native/jni/utils/include/stream.hpp | 38 ++++++---- native/jni/utils/stream.cpp | 103 +++++++++++++++++++++++----- 4 files changed, 145 insertions(+), 58 deletions(-) diff --git a/native/jni/core/logging.cpp b/native/jni/core/logging.cpp index 9ed29de34..6ec5a8fba 100644 --- a/native/jni/core/logging.cpp +++ b/native/jni/core/logging.cpp @@ -36,7 +36,6 @@ void setup_logfile(bool reset) { // Maximum message length for pipes to transfer atomically #define MAX_MSG_LEN (PIPE_BUF - sizeof(log_meta)) -#define TIME_FMT_LEN 35 static void logfile_writer(int pipefd) { run_finally close_socket([=] { @@ -52,7 +51,12 @@ static void logfile_writer(int pipefd) { stream *strm = new byte_stream(tmp_buf.data, tmp_buf.len); log_meta meta{}; - char buf[MAX_MSG_LEN + TIME_FMT_LEN]; + char buf[MAX_MSG_LEN]; + char aux[64]; + + iovec iov[2]; + iov[0].iov_base = aux; + iov[1].iov_base = buf; for (;;) { // Read meta data @@ -80,6 +84,10 @@ static void logfile_writer(int pipefd) { continue; } + // Read message + if (xread(pipefd, buf, meta.len) != meta.len) + return; + timeval tv; tm tm; gettimeofday(&tv, nullptr); @@ -101,16 +109,15 @@ static void logfile_writer(int pipefd) { type = 'E'; break; } - size_t off = strftime(buf, sizeof(buf), "%m-%d %T", &tm); int ms = tv.tv_usec / 1000; - off += snprintf(buf + off, sizeof(buf) - off, + size_t off = strftime(aux, sizeof(aux), "%m-%d %T", &tm); + off += snprintf(aux + off, sizeof(aux) - off, ".%03d %5d %5d %c : ", ms, meta.pid, meta.tid, type); - // Read message - if (xread(pipefd, buf + off, meta.len) != meta.len) - return; + iov[0].iov_len = off; + iov[1].iov_len = meta.len; - strm->write(buf, off + meta.len); + strm->writev(iov, 2); } } diff --git a/native/jni/magiskboot/compress.cpp b/native/jni/magiskboot/compress.cpp index 157d95968..1b7344cdd 100644 --- a/native/jni/magiskboot/compress.cpp +++ b/native/jni/magiskboot/compress.cpp @@ -25,11 +25,14 @@ class cpr_stream : public filter_stream { public: using filter_stream::filter_stream; using stream::read; + ssize_t writeFully(void *buf, size_t len) override { + return write(buf, len); + } }; class gz_strm : public cpr_stream { public: - int write(const void *buf, size_t len) override { + ssize_t write(const void *buf, size_t len) override { return len ? write(buf, len, Z_NO_FLUSH) : 0; } @@ -67,8 +70,8 @@ private: z_stream strm; uint8_t outbuf[CHUNK]; - int write(const void *buf, size_t len, int flush) { - int ret = 0; + ssize_t write(const void *buf, size_t len, int flush) { + size_t ret = 0; strm.next_in = (Bytef *) buf; strm.avail_in = len; do { @@ -105,7 +108,7 @@ public: class bz_strm : public cpr_stream { public: - int write(const void *buf, size_t len) override { + ssize_t write(const void *buf, size_t len) override { return len ? write(buf, len, BZ_RUN) : 0; } @@ -143,8 +146,8 @@ private: bz_stream strm; char outbuf[CHUNK]; - int write(const void *buf, size_t len, int flush) { - int ret = 0; + ssize_t write(const void *buf, size_t len, int flush) { + size_t ret = 0; strm.next_in = (char *) buf; strm.avail_in = len; do { @@ -181,7 +184,7 @@ public: class lzma_strm : public cpr_stream { public: - int write(const void *buf, size_t len) override { + ssize_t write(const void *buf, size_t len) override { return len ? write(buf, len, LZMA_RUN) : 0; } @@ -229,8 +232,8 @@ private: lzma_stream strm; uint8_t outbuf[CHUNK]; - int write(const void *buf, size_t len, lzma_action flush) { - int ret = 0; + ssize_t write(const void *buf, size_t len, lzma_action flush) { + size_t ret = 0; strm.next_in = (uint8_t *) buf; strm.avail_in = len; do { @@ -274,8 +277,8 @@ public: delete[] outbuf; } - int write(const void *buf, size_t len) override { - int ret = 0; + ssize_t write(const void *buf, size_t len) override { + size_t ret = 0; auto inbuf = reinterpret_cast(buf); if (!outbuf) read_header(inbuf, len); @@ -325,8 +328,8 @@ public: LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); } - int write(const void *buf, size_t len) override { - int ret = 0; + ssize_t write(const void *buf, size_t len) override { + size_t ret = 0; if (!outbuf) ret += write_header(); if (len == 0) @@ -390,8 +393,8 @@ public: delete[] buf; } - int write(const void *in, size_t size) override { - int ret = 0; + ssize_t write(const void *in, size_t size) override { + size_t ret = 0; auto inbuf = static_cast(in); if (!init) { // Skip magic @@ -399,7 +402,7 @@ public: size -= 4; init = true; } - for (int consumed; size != 0;) { + for (size_t consumed; size != 0;) { if (block_sz == 0) { if (buf_off + size >= sizeof(block_sz)) { consumed = sizeof(block_sz) - buf_off; @@ -452,8 +455,8 @@ public: cpr_stream(std::move(base)), outbuf(new char[LZ4_COMPRESSED]), buf(new char[LZ4_UNCOMPRESSED]), init(false), lg(lg), buf_off(0), in_total(0) {} - int write(const void *in, size_t size) override { - int ret = 0; + ssize_t write(const void *in, size_t size) override { + size_t ret = 0; if (!init) { ret += bwrite("\x02\x21\x4c\x18", 4); init = true; diff --git a/native/jni/utils/include/stream.hpp b/native/jni/utils/include/stream.hpp index 210a908b4..ce2230cb4 100644 --- a/native/jni/utils/include/stream.hpp +++ b/native/jni/utils/include/stream.hpp @@ -1,14 +1,19 @@ #pragma once -#include +#include +#include #include #include "../files.hpp" class stream { public: - virtual int read(void *buf, size_t len); - virtual int write(const void *buf, size_t len); + virtual ssize_t read(void *buf, size_t len); + virtual ssize_t readFully(void *buf, size_t len); + virtual ssize_t readv(const iovec *iov, int iovcnt); + virtual ssize_t write(const void *buf, size_t len); + virtual ssize_t writeFully(void *buf, size_t len); + virtual ssize_t writev(const iovec *iov, int iovcnt); virtual off_t seek(off_t off, int whence); virtual ~stream() = default; }; @@ -20,8 +25,11 @@ class filter_stream : public stream { public: filter_stream(stream_ptr &&base) : base(std::move(base)) {} - int read(void *buf, size_t len) override; - int write(const void *buf, size_t len) override; + ssize_t read(void *buf, size_t len) override; + ssize_t write(const void *buf, size_t len) override; + + // Seeking while filtering does not make sense + off_t seek(off_t off, int whence) final { return stream::seek(off, whence); } protected: stream_ptr base; @@ -31,10 +39,11 @@ protected: class byte_stream : public stream { public: byte_stream(uint8_t *&buf, size_t &len); - template - byte_stream(byte *&buf, size_t &len) : byte_stream(reinterpret_cast(buf), len) {} - int read(void *buf, size_t len) override; - int write(const void *buf, size_t len) override; + template + byte_stream(Byte *&buf, size_t &len) : byte_stream(reinterpret_cast(buf), len) {} + + ssize_t read(void *buf, size_t len) override; + ssize_t write(const void *buf, size_t len) override; off_t seek(off_t off, int whence) override; private: @@ -50,8 +59,10 @@ private: class fd_stream : public stream { public: fd_stream(int fd) : fd(fd) {} - int read(void *buf, size_t len) override; - int write(const void *buf, size_t len) override; + ssize_t read(void *buf, size_t len) override; + ssize_t readv(const iovec *iov, int iovcnt) override; + ssize_t write(const void *buf, size_t len) override; + ssize_t writev(const iovec *iov, int iovcnt) override; off_t seek(off_t off, int whence) override; private: @@ -67,8 +78,9 @@ class fp_stream final : public stream { public: fp_stream(FILE *fp = nullptr) : fp(fp, fclose) {} fp_stream(sFILE &&fp) : fp(std::move(fp)) {} - int read(void *buf, size_t len) override; - int write(const void *buf, size_t len) override; + + ssize_t read(void *buf, size_t len) override; + ssize_t write(const void *buf, size_t len) override; off_t seek(off_t off, int whence) override; private: diff --git a/native/jni/utils/stream.cpp b/native/jni/utils/stream.cpp index ae00d56d5..f27085188 100644 --- a/native/jni/utils/stream.cpp +++ b/native/jni/utils/stream.cpp @@ -1,23 +1,27 @@ +#include + #include #include +using namespace std; + static int strm_read(void *v, char *buf, int len) { - auto strm = reinterpret_cast(v); + auto strm = static_cast(v); return strm->read(buf, len); } static int strm_write(void *v, const char *buf, int len) { - auto strm = reinterpret_cast(v); + auto strm = static_cast(v); return strm->write(buf, len); } static fpos_t strm_seek(void *v, fpos_t off, int whence) { - auto strm = reinterpret_cast(v); + auto strm = static_cast(v); return strm->seek(off, whence); } static int strm_close(void *v) { - auto strm = reinterpret_cast(v); + auto strm = static_cast(v); delete strm; return 0; } @@ -28,26 +32,79 @@ sFILE make_stream_fp(stream_ptr &&strm) { return fp; } -int stream::read(void *buf, size_t len) { - LOGE("This stream does not support read\n"); +ssize_t stream::read(void *buf, size_t len) { + LOGE("This stream does not implement read\n"); return -1; } -int stream::write(const void *buf, size_t len) { - LOGE("This stream does not support write\n"); +ssize_t stream::readFully(void *buf, size_t len) { + size_t read_sz = 0; + ssize_t ret; + do { + ret = read((byte *) buf + read_sz, len - read_sz); + if (ret < 0) { + if (errno == EINTR) + continue; + return ret; + } + read_sz += ret; + } while (read_sz != len && ret != 0); + return read_sz; +} + +ssize_t stream::readv(const iovec *iov, int iovcnt) { + size_t read_sz = 0; + for (int i = 0; i < iovcnt; ++i) { + auto ret = readFully(iov[i].iov_base, iov[i].iov_len); + if (ret < 0) + return ret; + read_sz += ret; + } + return read_sz; +} + +ssize_t stream::write(const void *buf, size_t len) { + LOGE("This stream does not implement write\n"); return -1; } +ssize_t stream::writeFully(void *buf, size_t len) { + size_t write_sz = 0; + ssize_t ret; + do { + ret = write((byte *) buf + write_sz, len - write_sz); + if (ret < 0) { + if (errno == EINTR) + continue; + return ret; + } + write_sz += ret; + } while (write_sz != len && ret != 0); + return write_sz; +} + +ssize_t stream::writev(const iovec *iov, int iovcnt) { + size_t write_sz = 0; + for (int i = 0; i < iovcnt; ++i) { + auto ret = writeFully(iov[i].iov_base, iov[i].iov_len); + if (ret < 0) + return ret; + write_sz += ret; + } + return write_sz; +} + off_t stream::seek(off_t off, int whence) { - LOGE("This stream does not support seek\n"); + LOGE("This stream does not implement seek\n"); return -1; } -int fp_stream::read(void *buf, size_t len) { - return fread(buf, 1, len, fp.get()); +ssize_t fp_stream::read(void *buf, size_t len) { + auto ret = fread(buf, 1, len, fp.get()); + return ret ? ret : (ferror(fp.get()) ? -1 : 0); } -int fp_stream::write(const void *buf, size_t len) { +ssize_t fp_stream::write(const void *buf, size_t len) { return fwrite(buf, 1, len, fp.get()); } @@ -55,11 +112,11 @@ off_t fp_stream::seek(off_t off, int whence) { return fseek(fp.get(), off, whence); } -int filter_stream::read(void *buf, size_t len) { +ssize_t filter_stream::read(void *buf, size_t len) { return base->read(buf, len); } -int filter_stream::write(const void *buf, size_t len) { +ssize_t filter_stream::write(const void *buf, size_t len) { return base->write(buf, len); } @@ -68,13 +125,13 @@ byte_stream::byte_stream(uint8_t *&buf, size_t &len) : _buf(buf), _len(len) { len = 0; } -int byte_stream::read(void *buf, size_t len) { - len = std::min(len, _len - _pos); +ssize_t byte_stream::read(void *buf, size_t len) { + len = std::min((size_t) len, _len - _pos); memcpy(buf, _buf + _pos, len); return len; } -int byte_stream::write(const void *buf, size_t len) { +ssize_t byte_stream::write(const void *buf, size_t len) { resize(_pos + len); memcpy(_buf + _pos, buf, len); _pos += len; @@ -116,14 +173,22 @@ void byte_stream::resize(size_t new_pos, bool zero) { } } -int fd_stream::read(void *buf, size_t len) { +ssize_t fd_stream::read(void *buf, size_t len) { return ::read(fd, buf, len); } -int fd_stream::write(const void *buf, size_t len) { +ssize_t fd_stream::readv(const iovec *iov, int iovcnt) { + return ::readv(fd, iov, iovcnt); +} + +ssize_t fd_stream::write(const void *buf, size_t len) { return ::write(fd, buf, len); } +ssize_t fd_stream::writev(const iovec *iov, int iovcnt) { + return ::writev(fd, iov, iovcnt); +} + off_t fd_stream::seek(off_t off, int whence) { return lseek(fd, off, whence); }