19 #include <sys/types.h> 31 using namespace recordio_helpers;
36 writeLock_(file_,
std::defer_lock),
39 throw std::runtime_error(
"RecordIOWriter: file locked by another process");
50 if (totalLength == 0) {
54 DCHECK_EQ(buf->computeChainDataLength(), totalLength);
57 off_t pos =
filePos_.fetch_add(off_t(totalLength));
59 #if FOLLY_HAVE_PWRITEV 60 auto iov = buf->getIov();
69 DCHECK_EQ(
size_t(bytes), totalLength);
103 namespace recordio_helpers {
109 constexpr
uint32_t kHashSeed = 0xdeadbeef;
111 uint32_t headerHash(
const Header& header) {
113 &header, offsetof(Header, headerHash), kHashSeed);
116 std::pair<size_t, std::size_t> dataLengthAndHash(
const IOBuf* buf) {
119 hasher.
Init(kHashSeed, kHashSeed);
120 for (
auto br : *buf) {
122 hasher.
Update(br.data(), br.size());
126 hasher.
Final(&hash1, &hash2);
128 throw std::invalid_argument(
"Record length must fit in 32 bits");
130 return std::make_pair(len, static_cast<std::size_t>(hash1));
141 throw std::invalid_argument(
"invalid file id");
143 auto lengthAndHash = dataLengthAndHash(buf.get());
144 if (lengthAndHash.first == 0) {
159 Header* header =
reinterpret_cast<Header*
>(buf->writableData());
160 memset(header, 0,
sizeof(Header));
162 header->fileId = fileId;
163 header->dataLength =
uint32_t(lengthAndHash.first);
164 header->dataHash = lengthAndHash.second;
165 header->headerHash = headerHash(*header);
174 const Header* header =
reinterpret_cast<const Header*
>(range.
begin());
177 header->hashFunction != 0 || header->flags != 0 ||
178 (fileId != 0 && header->fileId != fileId) ||
179 header->dataLength > range.
size()) {
182 if (headerHash(*header) != header->headerHash) {
185 range.
reset(range.
begin(), header->dataLength);
186 if (dataHash(range) != header->dataHash) {
189 return {header->fileId, range};
196 reinterpret_cast<const uint8_t*>(&magic),
sizeof(magic));
198 DCHECK_GE(searchRange.
begin(), wholeRange.
begin());
199 DCHECK_LE(searchRange.
end(), wholeRange.
end());
205 while (start < end) {
206 auto p =
ByteRange(start, end +
sizeof(magic)).
find(magicRange);
213 if (!r.record.empty()) {
218 start +=
sizeof(magic);
std::atomic< off_t > filePos_
std::unique_lock< File > writeLock_
constexpr size_t headerSize()
size_t prependHeader(std::unique_ptr< IOBuf > &buf, uint32_t fileId)
static std::unique_ptr< IOBuf > create(std::size_t capacity)
ssize_t pwriteFull(int fd, const void *buf, size_t count, off_t offset)
static uint32_t Hash32(const void *message, size_t length, uint32_t seed)
constexpr detail::Map< Move > move
size_type find(const_range_type str) const
void advance(size_type n)
constexpr size_type size() const
—— Concurrent Priority Queue Implementation ——
void Init(uint64_t seed1, uint64_t seed2)
RecordInfo findRecord(ByteRange searchRange, ByteRange wholeRange, uint32_t fileId)
RecordInfo validateRecord(ByteRange range, uint32_t fileId)
constexpr bool empty() const
static uint64_t Hash64(const void *message, size_t length, uint64_t seed)
std::unordered_map< std::type_index, Entry * > map_
constexpr Iter data() const
constexpr Range< Iter > range(Iter first, Iter last)
constexpr Iter end() const
void Final(uint64_t *hash1, uint64_t *hash2) const
RecordIOWriter(File file, uint32_t fileId=1)
void checkUnixError(ssize_t ret, Args &&...args)
constexpr Iter begin() const
static const size_type npos
Iterator(ByteRange range, uint32_t fileId, off_t pos)
Range< const unsigned char * > ByteRange
void Update(const void *message, size_t length)
void write(std::unique_ptr< IOBuf > buf)
ssize_t pwritevFull(int fd, iovec *iov, int count, off_t offset)
std::pair< ByteRange, off_t > recordAndPos_
RecordIOReader(File file, uint32_t fileId=0)
void reset(Iter start, size_type size)