Extend stream support

This commit is contained in:
topjohnwu
2021-08-13 02:08:56 -07:00
parent b325aa4555
commit 27f7fa7153
4 changed files with 145 additions and 58 deletions

View File

@@ -1,14 +1,19 @@
#pragma once
#include <stdio.h>
#include <sys/uio.h>
#include <cstdio>
#include <memory>
#include "../files.hpp"
class stream {
public:
virtual int read(void *buf, size_t len);
virtual int write(const void *buf, size_t len);
virtual ssize_t read(void *buf, size_t len);
virtual ssize_t readFully(void *buf, size_t len);
virtual ssize_t readv(const iovec *iov, int iovcnt);
virtual ssize_t write(const void *buf, size_t len);
virtual ssize_t writeFully(void *buf, size_t len);
virtual ssize_t writev(const iovec *iov, int iovcnt);
virtual off_t seek(off_t off, int whence);
virtual ~stream() = default;
};
@@ -20,8 +25,11 @@ class filter_stream : public stream {
public:
filter_stream(stream_ptr &&base) : base(std::move(base)) {}
int read(void *buf, size_t len) override;
int write(const void *buf, size_t len) override;
ssize_t read(void *buf, size_t len) override;
ssize_t write(const void *buf, size_t len) override;
// Seeking while filtering does not make sense
off_t seek(off_t off, int whence) final { return stream::seek(off, whence); }
protected:
stream_ptr base;
@@ -31,10 +39,11 @@ protected:
class byte_stream : public stream {
public:
byte_stream(uint8_t *&buf, size_t &len);
template <class byte>
byte_stream(byte *&buf, size_t &len) : byte_stream(reinterpret_cast<uint8_t *&>(buf), len) {}
int read(void *buf, size_t len) override;
int write(const void *buf, size_t len) override;
template <class Byte>
byte_stream(Byte *&buf, size_t &len) : byte_stream(reinterpret_cast<uint8_t *&>(buf), len) {}
ssize_t read(void *buf, size_t len) override;
ssize_t write(const void *buf, size_t len) override;
off_t seek(off_t off, int whence) override;
private:
@@ -50,8 +59,10 @@ private:
class fd_stream : public stream {
public:
fd_stream(int fd) : fd(fd) {}
int read(void *buf, size_t len) override;
int write(const void *buf, size_t len) override;
ssize_t read(void *buf, size_t len) override;
ssize_t readv(const iovec *iov, int iovcnt) override;
ssize_t write(const void *buf, size_t len) override;
ssize_t writev(const iovec *iov, int iovcnt) override;
off_t seek(off_t off, int whence) override;
private:
@@ -67,8 +78,9 @@ class fp_stream final : public stream {
public:
fp_stream(FILE *fp = nullptr) : fp(fp, fclose) {}
fp_stream(sFILE &&fp) : fp(std::move(fp)) {}
int read(void *buf, size_t len) override;
int write(const void *buf, size_t len) override;
ssize_t read(void *buf, size_t len) override;
ssize_t write(const void *buf, size_t len) override;
off_t seek(off_t off, int whence) override;
private:

View File

@@ -1,23 +1,27 @@
#include <cstddef>
#include <utils.hpp>
#include <stream.hpp>
using namespace std;
static int strm_read(void *v, char *buf, int len) {
auto strm = reinterpret_cast<stream *>(v);
auto strm = static_cast<stream *>(v);
return strm->read(buf, len);
}
static int strm_write(void *v, const char *buf, int len) {
auto strm = reinterpret_cast<stream *>(v);
auto strm = static_cast<stream *>(v);
return strm->write(buf, len);
}
static fpos_t strm_seek(void *v, fpos_t off, int whence) {
auto strm = reinterpret_cast<stream *>(v);
auto strm = static_cast<stream *>(v);
return strm->seek(off, whence);
}
static int strm_close(void *v) {
auto strm = reinterpret_cast<stream *>(v);
auto strm = static_cast<stream *>(v);
delete strm;
return 0;
}
@@ -28,26 +32,79 @@ sFILE make_stream_fp(stream_ptr &&strm) {
return fp;
}
int stream::read(void *buf, size_t len) {
LOGE("This stream does not support read\n");
ssize_t stream::read(void *buf, size_t len) {
LOGE("This stream does not implement read\n");
return -1;
}
int stream::write(const void *buf, size_t len) {
LOGE("This stream does not support write\n");
ssize_t stream::readFully(void *buf, size_t len) {
size_t read_sz = 0;
ssize_t ret;
do {
ret = read((byte *) buf + read_sz, len - read_sz);
if (ret < 0) {
if (errno == EINTR)
continue;
return ret;
}
read_sz += ret;
} while (read_sz != len && ret != 0);
return read_sz;
}
ssize_t stream::readv(const iovec *iov, int iovcnt) {
size_t read_sz = 0;
for (int i = 0; i < iovcnt; ++i) {
auto ret = readFully(iov[i].iov_base, iov[i].iov_len);
if (ret < 0)
return ret;
read_sz += ret;
}
return read_sz;
}
ssize_t stream::write(const void *buf, size_t len) {
LOGE("This stream does not implement write\n");
return -1;
}
ssize_t stream::writeFully(void *buf, size_t len) {
size_t write_sz = 0;
ssize_t ret;
do {
ret = write((byte *) buf + write_sz, len - write_sz);
if (ret < 0) {
if (errno == EINTR)
continue;
return ret;
}
write_sz += ret;
} while (write_sz != len && ret != 0);
return write_sz;
}
ssize_t stream::writev(const iovec *iov, int iovcnt) {
size_t write_sz = 0;
for (int i = 0; i < iovcnt; ++i) {
auto ret = writeFully(iov[i].iov_base, iov[i].iov_len);
if (ret < 0)
return ret;
write_sz += ret;
}
return write_sz;
}
off_t stream::seek(off_t off, int whence) {
LOGE("This stream does not support seek\n");
LOGE("This stream does not implement seek\n");
return -1;
}
int fp_stream::read(void *buf, size_t len) {
return fread(buf, 1, len, fp.get());
ssize_t fp_stream::read(void *buf, size_t len) {
auto ret = fread(buf, 1, len, fp.get());
return ret ? ret : (ferror(fp.get()) ? -1 : 0);
}
int fp_stream::write(const void *buf, size_t len) {
ssize_t fp_stream::write(const void *buf, size_t len) {
return fwrite(buf, 1, len, fp.get());
}
@@ -55,11 +112,11 @@ off_t fp_stream::seek(off_t off, int whence) {
return fseek(fp.get(), off, whence);
}
int filter_stream::read(void *buf, size_t len) {
ssize_t filter_stream::read(void *buf, size_t len) {
return base->read(buf, len);
}
int filter_stream::write(const void *buf, size_t len) {
ssize_t filter_stream::write(const void *buf, size_t len) {
return base->write(buf, len);
}
@@ -68,13 +125,13 @@ byte_stream::byte_stream(uint8_t *&buf, size_t &len) : _buf(buf), _len(len) {
len = 0;
}
int byte_stream::read(void *buf, size_t len) {
len = std::min(len, _len - _pos);
ssize_t byte_stream::read(void *buf, size_t len) {
len = std::min((size_t) len, _len - _pos);
memcpy(buf, _buf + _pos, len);
return len;
}
int byte_stream::write(const void *buf, size_t len) {
ssize_t byte_stream::write(const void *buf, size_t len) {
resize(_pos + len);
memcpy(_buf + _pos, buf, len);
_pos += len;
@@ -116,14 +173,22 @@ void byte_stream::resize(size_t new_pos, bool zero) {
}
}
int fd_stream::read(void *buf, size_t len) {
ssize_t fd_stream::read(void *buf, size_t len) {
return ::read(fd, buf, len);
}
int fd_stream::write(const void *buf, size_t len) {
ssize_t fd_stream::readv(const iovec *iov, int iovcnt) {
return ::readv(fd, iov, iovcnt);
}
ssize_t fd_stream::write(const void *buf, size_t len) {
return ::write(fd, buf, len);
}
ssize_t fd_stream::writev(const iovec *iov, int iovcnt) {
return ::writev(fd, iov, iovcnt);
}
off_t fd_stream::seek(off_t off, int whence) {
return lseek(fd, off, whence);
}