Previously I've been able to crawl and index web pages for my search engine without a problem, until my database grew more than what RabbitMQ's message queue was capable of holding. If a message in a message queue exceeds its default size, RabbitMQ will throw an error and panic, I could change the default size but that would not scale if my database grows, so in order for users to crawl web pages without having to worry about the message broker crashing.
Creating Segments
I've implemented a function to create segments with a maximum segment size or MSS from the same principles from TCP when creating segments, the segment contains an 8 byte header where each 4 byte of the 8 byte header is the sequence number and the total segment count, and the rest of the body is the payload of the segmented database.
// MSS is number in bytes
function createSegments(
webpages: Array<Webpage>, // webpages queried from database
MSS: number,
): Array<ArrayBufferLike> {
const text_encoder = new TextEncoder();
const encoded_text = text_encoder.encode(JSON.stringify(webpages));
const data_length = encoded_text.byteLength;
let currentIndex = 0;
let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder
let segments: Array<ArrayBufferLike> = [];
let pointerPosition = MSS;
for (let i = 0; i < segmentCount; i++) {
let currentDataLength = Math.abs(currentIndex - data_length);
let slicedArray = encoded_text.slice(currentIndex, pointerPosition);
currentIndex += slicedArray.byteLength;
// Add to offset MSS to point to the next segment in the array
// manipulate pointerPosition to adjust to lower values using Math.min()
// Is current data length enough to fit MSS?
// if so add from current position + MSS
// else get remaining of the currentDataLength
pointerPosition += Math.min(MSS, currentDataLength);
const payload = new Uint8Array(slicedArray.length);
payload.set(slicedArray);
segments.push(newSegment(i, segmentCount, Buffer.from(payload)));
}
return segments;
}
function newSegment(
sequenceNum: number,
segmentCount: number,
payload: Buffer,
): ArrayBufferLike {
// 4 bytes for sequenceNum 4 bytes for totalSegmentsCount
const sequenceNumBuffer = convertIntToBuffer(sequenceNum);
const segmentCountBuffer = convertIntToBuffer(segmentCount);
const headerBuffer = new ArrayBuffer(8);
const header = new Uint8Array(headerBuffer);
header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer]));
return Buffer.concat([header, payload]);
}
function convertIntToBuffer(int: number): Buffer {
const bytes = Buffer.alloc(4);
bytes.writeIntLE(int, 0, 4);
console.log(bytes);
return bytes;
}
Parsing incoming segments
This method of creating small segments of a large dataset would help scale the database query even if the database grows.
Now how does the search engine parse the buffer and transform each segments into a web page array?
Reading from segment buffers
First extract the segment header, since the header contains 2 properties namely Sequence number
and Total Segments
,
func GetSegmentHeader(buf []byte) (*SegmentHeader, error) {
byteReader := bytes.NewBuffer(buf)
headerOffsets := []int{0, 4}
newSegmentHeader := SegmentHeader{}
for i := range headerOffsets {
buffer := make([]byte, 4)
_, err := byteReader.Read(buffer)
if err != nil {
return &SegmentHeader{}, err
}
value := binary.LittleEndian.Uint32(buffer)
// this feels disgusting but i dont feel like bothering with this
if i == 0 {
newSegmentHeader.SequenceNum = value
continue
}
newSegmentHeader.TotalSegments = value
}
return &newSegmentHeader, nil
}
func GetSegmentPayload(buf []byte) ([]byte, error) {
headerOffset := 8
byteReader := bytes.NewBuffer(buf[headerOffset:])
return byteReader.Bytes(), nil
}
Handling retransmission and requeuing of segments
The sequence number will be used for retransmission/requeuing of the segments, so if the expected sequence number is not what was received then re-queue every segment starting from the current one.
// for retransmission/requeuing
if segmentHeader.SequenceNum != expectedSequenceNum {
ch.Nack(data.DeliveryTag, true, true)
log.Printf("Expected Sequence number %d, got %d\n",
expectedSequenceNum, segmentHeader.SequenceNum)
continue
}
Appending segment payloads
The total segment will be used for breaking out of listening to the producer (database service) if the total number of segments received by the search engine is equal to the length of the total segments that is to be sent by the database service then break out and parse the aggregated segment buffer, if not the keep listening and append the segment payload buffer to a web page buffer to hold bytes from all of the incoming segments.
segmentCounter++
fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments)
fmt.Printf("current segments : %d\n", segmentCounter)
expectedSequenceNum++
ch.Ack(data.DeliveryTag, false)
webpageBytes = append(webpageBytes, segmentPayload...)
fmt.Printf("Byte Length: %d\n", len(webpageBytes))
if segmentCounter == segmentHeader.TotalSegments {
log.Printf("Got all segments from Database %d", segmentCounter)
break
}
I use vim btw
Thank you for coming to my ted talk, I will be implementing more features and fixes for zensearch.
Top comments (2)
Absolutely incredible! 😻.
oh really? tell me what you thought was incredible?