proxygen
RecordIO.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/io/RecordIO.h>
18 
19 #include <sys/types.h>
20 
21 #include <folly/Exception.h>
22 #include <folly/FileUtil.h>
23 #include <folly/Memory.h>
24 #include <folly/Portability.h>
25 #include <folly/ScopeGuard.h>
26 #include <folly/String.h>
28 
29 namespace folly {
30 
31 using namespace recordio_helpers;
32 
34  : file_(std::move(file)),
35  fileId_(fileId),
36  writeLock_(file_, std::defer_lock),
37  filePos_(0) {
38  if (!writeLock_.try_lock()) {
39  throw std::runtime_error("RecordIOWriter: file locked by another process");
40  }
41 
42  struct stat st;
43  checkUnixError(fstat(file_.fd(), &st), "fstat() failed");
44 
45  filePos_ = st.st_size;
46 }
47 
48 void RecordIOWriter::write(std::unique_ptr<IOBuf> buf) {
49  size_t totalLength = prependHeader(buf, fileId_);
50  if (totalLength == 0) {
51  return; // nothing to do
52  }
53 
54  DCHECK_EQ(buf->computeChainDataLength(), totalLength);
55 
56  // We're going to write. Reserve space for ourselves.
57  off_t pos = filePos_.fetch_add(off_t(totalLength));
58 
59 #if FOLLY_HAVE_PWRITEV
60  auto iov = buf->getIov();
61  ssize_t bytes = pwritevFull(file_.fd(), iov.data(), iov.size(), pos);
62 #else
63  buf->unshare();
64  buf->coalesce();
65  ssize_t bytes = pwriteFull(file_.fd(), buf->data(), buf->length(), pos);
66 #endif
67 
68  checkUnixError(bytes, "pwrite() failed");
69  DCHECK_EQ(size_t(bytes), totalLength);
70 }
71 
73  : map_(std::move(file)), fileId_(fileId) {}
74 
76  : range_(range), fileId_(fileId), recordAndPos_(ByteRange(), 0) {
77  if (size_t(pos) >= range_.size()) {
78  // Note that this branch can execute if pos is negative as well.
79  recordAndPos_.second = off_t(-1);
80  range_.clear();
81  } else {
82  recordAndPos_.second = pos;
83  range_.advance(size_t(pos));
85  }
86 }
87 
90  if (record.empty()) {
91  recordAndPos_ = std::make_pair(ByteRange(), off_t(-1));
92  range_.clear(); // at end
93  } else {
94  size_t skipped = size_t(record.begin() - range_.begin());
95  DCHECK_GE(skipped, headerSize());
96  skipped -= headerSize();
97  range_.advance(skipped);
98  recordAndPos_.first = record;
99  recordAndPos_.second += off_t(skipped);
100  }
101 }
102 
103 namespace recordio_helpers {
104 
106 
107 namespace {
108 
109 constexpr uint32_t kHashSeed = 0xdeadbeef; // for mcurtiss
110 
111 uint32_t headerHash(const Header& header) {
113  &header, offsetof(Header, headerHash), kHashSeed);
114 }
115 
116 std::pair<size_t, std::size_t> dataLengthAndHash(const IOBuf* buf) {
117  size_t len = 0;
119  hasher.Init(kHashSeed, kHashSeed);
120  for (auto br : *buf) {
121  len += br.size();
122  hasher.Update(br.data(), br.size());
123  }
124  uint64_t hash1;
125  uint64_t hash2;
126  hasher.Final(&hash1, &hash2);
128  throw std::invalid_argument("Record length must fit in 32 bits");
129  }
130  return std::make_pair(len, static_cast<std::size_t>(hash1));
131 }
132 
133 std::size_t dataHash(ByteRange range) {
134  return hash::SpookyHashV2::Hash64(range.data(), range.size(), kHashSeed);
135 }
136 
137 } // namespace
138 
139 size_t prependHeader(std::unique_ptr<IOBuf>& buf, uint32_t fileId) {
140  if (fileId == 0) {
141  throw std::invalid_argument("invalid file id");
142  }
143  auto lengthAndHash = dataLengthAndHash(buf.get());
144  if (lengthAndHash.first == 0) {
145  return 0; // empty, nothing to do, no zero-length records
146  }
147 
148  // Prepend to the first buffer in the chain if we have room, otherwise
149  // prepend a new buffer.
150  if (buf->headroom() >= headerSize()) {
151  buf->unshareOne();
152  buf->prepend(headerSize());
153  } else {
154  auto b = IOBuf::create(headerSize());
155  b->append(headerSize());
156  b->appendChain(std::move(buf));
157  buf = std::move(b);
158  }
159  Header* header = reinterpret_cast<Header*>(buf->writableData());
160  memset(header, 0, sizeof(Header));
161  header->magic = Header::kMagic;
162  header->fileId = fileId;
163  header->dataLength = uint32_t(lengthAndHash.first);
164  header->dataHash = lengthAndHash.second;
165  header->headerHash = headerHash(*header);
166 
167  return lengthAndHash.first + headerSize();
168 }
169 
171  if (range.size() <= headerSize()) { // records may not be empty
172  return {0, {}};
173  }
174  const Header* header = reinterpret_cast<const Header*>(range.begin());
175  range.advance(sizeof(Header));
176  if (header->magic != Header::kMagic || header->version != 0 ||
177  header->hashFunction != 0 || header->flags != 0 ||
178  (fileId != 0 && header->fileId != fileId) ||
179  header->dataLength > range.size()) {
180  return {0, {}};
181  }
182  if (headerHash(*header) != header->headerHash) {
183  return {0, {}};
184  }
185  range.reset(range.begin(), header->dataLength);
186  if (dataHash(range) != header->dataHash) {
187  return {0, {}};
188  }
189  return {header->fileId, range};
190 }
191 
193 findRecord(ByteRange searchRange, ByteRange wholeRange, uint32_t fileId) {
194  static const uint32_t magic = Header::kMagic;
195  static const ByteRange magicRange(
196  reinterpret_cast<const uint8_t*>(&magic), sizeof(magic));
197 
198  DCHECK_GE(searchRange.begin(), wholeRange.begin());
199  DCHECK_LE(searchRange.end(), wholeRange.end());
200 
201  const uint8_t* start = searchRange.begin();
202  const uint8_t* end =
203  std::min(searchRange.end(), wholeRange.end() - sizeof(Header));
204  // end-1: the last place where a Header could start
205  while (start < end) {
206  auto p = ByteRange(start, end + sizeof(magic)).find(magicRange);
207  if (p == ByteRange::npos) {
208  break;
209  }
210 
211  start += p;
212  auto r = validateRecord(ByteRange(start, wholeRange.end()), fileId);
213  if (!r.record.empty()) {
214  return r;
215  }
216 
217  // No repeated prefix in magic, so we can do better than start++
218  start += sizeof(magic);
219  }
220 
221  return {0, {}};
222 }
223 
224 } // namespace recordio_helpers
225 
226 } // namespace folly
std::atomic< off_t > filePos_
Definition: RecordIO.h:80
std::unique_lock< File > writeLock_
Definition: RecordIO.h:79
constexpr size_t headerSize()
Definition: RecordIO-inl.h:101
size_t prependHeader(std::unique_ptr< IOBuf > &buf, uint32_t fileId)
Definition: RecordIO.cpp:139
static std::unique_ptr< IOBuf > create(std::size_t capacity)
Definition: IOBuf.cpp:229
char b
LogLevel max
Definition: LogLevel.cpp:31
ssize_t pwriteFull(int fd, const void *buf, size_t count, off_t offset)
Definition: FileUtil.cpp:138
static uint32_t Hash32(const void *message, size_t length, uint32_t seed)
Definition: SpookyHashV2.h:84
constexpr detail::Map< Move > move
Definition: Base-inl.h:2567
size_type find(const_range_type str) const
Definition: Range.h:721
void advance(size_type n)
Definition: Range.h:672
STL namespace.
constexpr size_type size() const
Definition: Range.h:431
—— Concurrent Priority Queue Implementation ——
Definition: AtomicBitSet.h:29
void Init(uint64_t seed1, uint64_t seed2)
StringPiece range_
Definition: json.cpp:324
Iterator end() const
Definition: RecordIO-inl.h:66
RecordInfo findRecord(ByteRange searchRange, ByteRange wholeRange, uint32_t fileId)
Definition: RecordIO.cpp:193
RecordInfo validateRecord(ByteRange range, uint32_t fileId)
Definition: RecordIO.cpp:170
void clear()
Definition: Range.h:411
constexpr bool empty() const
Definition: Range.h:443
static uint64_t Hash64(const void *message, size_t length, uint64_t seed)
Definition: SpookyHashV2.h:71
std::unordered_map< std::type_index, Entry * > map_
LogLevel min
Definition: LogLevel.cpp:30
int fd() const
Definition: File.h:85
constexpr Iter data() const
Definition: Range.h:446
constexpr Range< Iter > range(Iter first, Iter last)
Definition: Range.h:1114
auto start
constexpr Iter end() const
Definition: Range.h:455
void Final(uint64_t *hash1, uint64_t *hash2) const
RecordIOWriter(File file, uint32_t fileId=1)
Definition: RecordIO.cpp:33
void checkUnixError(ssize_t ret, Args &&...args)
Definition: Exception.h:101
constexpr Iter begin() const
Definition: Range.h:452
static const size_type npos
Definition: Range.h:197
Iterator(ByteRange range, uint32_t fileId, off_t pos)
Definition: RecordIO.cpp:75
Range< const unsigned char * > ByteRange
Definition: Range.h:1163
void Update(const void *message, size_t length)
void write(std::unique_ptr< IOBuf > buf)
Definition: RecordIO.cpp:48
ssize_t pwritevFull(int fd, iovec *iov, int count, off_t offset)
Definition: FileUtil.cpp:154
std::pair< ByteRange, off_t > recordAndPos_
Definition: RecordIO-inl.h:54
RecordIOReader(File file, uint32_t fileId=0)
Definition: RecordIO.cpp:72
const uint8_t kMagic[2]
Definition: Dump.cpp:28
void reset(Iter start, size_type size)
Definition: Range.h:421