DEV Community

tarantool
tarantool

Posted on • Edited on

Alternative to MapReduce for search in distributed databases

Author: Satbek Turganbayev

Hello, my name is Satbek, and I'm part of the Tarantool team. I will tell you how to implement a search in a sharded cluster, the speed of which does not depend on the number of masters and the amount of stored data. I call this method an index layer:

• I will describe the general process of developing a search algorithm.
• I will give an example of implementation,
• And I will also give some recommendations for its development.

In the article, I consistently implement a simple CRUD service with sharded storage, as well as a data search. This will help you better understand the problems arising with distributed searches and how the index layer solves them.

We will use the Tarantool database (version ≥ 1.10), as well as the Tarantool-Cartridge clustering framework (version 2.7.0).

For a better understanding, you should learn a little about the Tarantool-Cartidge framework, the vshard module, and the Lua language, since the example is written in it.

General description of the application

As an example, let's implement a simple CRUD service with one user table sharded by id. Tarantool uses the term space. You can find the source code for the application here.

The required data will be stored in the user space in the following format:

|id(uuid)|bucket_id(number)|name(string)|birthdate(number)|phone_number(string)|

The users are sharded by id. There is a primary index by id and secondary index by name.

How do we search in such a cluster?

Search by id

-- router side
local bucket_id = vshard.router.bucket_id(user_id)
user = vshard.router.callrw(bucket_id, 'get_user_by_id', {user_id}, {timeout = 1})
-- storage side
local M = {}
function M.get_user_by_id(id)
    local user_t = get_user_tuple_by_id(id)
    if user_t == nil then
        return nil, M.user_not_exists_err:new('id=%s', id)
    end
    return user_tuple_to_output(user_t)
end
Enter fullscreen mode Exit fullscreen mode

To search by id, we use the standard Tarantool approach. We calculate the bucket_id on the router side to find out which storage node our data is on, and then send a request there. As a result, we get what we are looking for in no more than two queries.

Image description

Search by name

Next, we implement a search by name. Here, one request is not enough: we need to access each master storage in our cluster, since we don’t know where our data is.
Image description

-- router side
local M = {}
function M.find_users_by_name(name)
    local storage_uris = cartridge_rpc.get_candidates('app.roles.storage', {leader_only = true})
    local res_by_uri, err = cartridge_pool.map_call('storage_api.get_users_by_name', {name}, {uri_list = storage_uris, timeout = M.vshard_timeout})

    if err ~= nil then
        return nil, err
    end

    local result = {}
    for _, res in pairs(res_by_uri) do
        for _, user in pairs(res) do
            table.insert(result, user)
        end
    end
    return result
end

-- storage side
local M = {}
function M.get_users_by_name(user_name)
    local yield_every = 100
    local count = 1

    local result = {}
    for _, t in box.space.user.index.name:pairs({user_name}, 'EQ') do
        if count % yield_every == 0 then
            fiber.yield()
        end

        count = count + 1
        table.insert(result, user_tuple_to_output(t))
    end

    return result
end
Enter fullscreen mode Exit fullscreen mode

In this case, the number of requests will be no less than the number of masters in the cluster. However, this approach is acceptable if it is known that the required data lies on several masters and if there are few masters. Otherwise, an additional index for the name field is required to search efficiently. In the context of searching by person's name, the assumptions described above are valid if there is sufficient amount of data.

Widening the search

Now let's say you want to search for a user by phone number as well. To do this, you can use the search by name example we described above: make an additional index for storage and implement MapReduce for search. This algorithm will work and it's relatively easy to implement, especially with the help of the Tarantool/crud module. However, it has a significant drawback: the “phone number” field, as a rule, is only in one user entry. So, the entry with the desired value will be located on only one master. But in the MapReduce algorithm, we will have to visit all the masters in the cluster, and if you have two of them, you will have to make two read requests, if 10 — then 10 requests, etc. And all of these will be network requests, which are very expensive compared to reading by index, and they can fail. This leads to a new problem — redundant network requests when searching for unique fields.

Index layer structure

The difficulty here is that we need to find unique data, but we do not know which master it is stored on. So, we need to store this information. We must use up memory to reduce network requests.

Let's create a special user_search_index space:

|user_id(string-uuid)|bucket_id(unsigned)|data_hash(string)|data(any)|

This space will be the search layer. When writing data, we will calculate data_hash of the information used for search:

-- one of the many options for doing this
local msgpack = require('msgpack')
local digest = require('digest')

local data = {'phone-number', '202-555-0165'}
local data_hash = digest.md5_hex(msgpack.encode(data))
Enter fullscreen mode Exit fullscreen mode

The user_search_index space is sharded by data_hash. In order to avoid collisions, you need to store data. Here you can find the application module that implements the hash building logic:

local M = {}

local digest = require('digest')
local msgpack = require('msgpack')
M.cmp_data = {}

local Identifier = {}

function Identifier:new(data)
    local obj = {}
    obj.hash = digest.md5_hex(msgpack.encode(data))
    obj.data = data
    return obj
end

function M.phone_number(phone_number)
    local data = {'phone_number', phone_number}
    return Identifier:new(data)
end

function M.cmp_data.phone_number(data_one, data_two)
    return #data_one == 2 and #data_two == 2 and (data_one[1] == data_two[1]) and (data_one[2] == data_two[2] ~= nil)
end

return M
Enter fullscreen mode Exit fullscreen mode

Read

While searching, we:

• calculate the hash of the search data;
• use it to find data in user_search_index;
• send a request to storage and get the data by primary-key.

Example code for searching by phone number:

-- api.lua
local M = {}
function M.find_users_by_phone_number(phone_number)
    local user_ids, err = M.search_index.user_id.get_by_phone_number(phone_number)
    if err ~= nil then
        err = errors.wrap(err)
        return nil, err
    end

    return get_users_by_ids(user_ids)
end

-- search_api.lua
local M = {}
function M.user_id.get_by_phone_number(phone_number)
    local identifier = M.identifier.phone_number(phone_number)

    local bucket_id = M.vshard_router.bucket_id_mpcrc32(identifier.hash)

    local ids, err = M.vshard_router.callrw(bucket_id, 'search_storage_api.user_id.get_by_phone_number',
        {identifier.hash, identifier.data}, {timeout = M.vshard_timeout}
    )
    if err ~= nil then
        err = errors.wrap(err)
        return nil, err
    end
    return ids
end

-- search_storage_api.lua
local M = {}
local function get_users_by_hash(hash, data, cmp_func)
    local result = {}
    for _, t in box.space.user_search_index.index.hash:pairs({hash}, 'EQ') do
        if t.data_hash ~= hash then
            break
        end
        if cmp_func(t.data, data) then
            result[#result + 1] = t.user_id
        end
    end
    if #result == 0 then
        return nil, not_found_err:new()
    end
    return result
end

function M.user_id.get_by_phone_number(hash, data)
    return get_users_by_hash(hash, data, M.cmp_data.phone_number)
end
Enter fullscreen mode Exit fullscreen mode

Here we managed without MapReduce. In the case of unique data, we make one network request to find the primary_key, and then another one to get the desired data.

Indexing arbitrary data

The index layer allows you to move indexing from the database level to the application code level. This opens up great possibilities for indexing arbitrary data. For example, it is quite easy to make a composite index based on name and date of birth.

-- identifier.lua

local M = {}

local digest = require('digest')
local msgpack = require('msgpack')
M.cmp_data = {}

local Identifier = {}

function Identifier:new(data)
    local obj = {}
    obj.hash = digest.md5_hex(msgpack.encode(data))
    obj.data = data
    return obj
end

...

function M.name_birthdate(name, birthdate)
    local data = {'name', name, 'birthdate', birthdate}
    return Identifier:new(data)
end

function M.cmp_data.name_birthdate(data_one, data_two)
    return #data_one == 4 and #data_two == 4 and
            (data_one[1] == data_two[1]) and
            (data_one[2] == data_two[2] ~= nil) and
            (data_one[3] == data_two[3] ~= nil) and
            (data_one[4] == data_two[4] ~= nil)
end

return M
Enter fullscreen mode Exit fullscreen mode

The rest is identical to searching by phone number.

Write

Earlier, I described how to implement search using the index layer. Now let's look at how it is built. When you create or update data, you must also create or update the index layer. The implementation of these processes depends on the specific task, but most likely it will be happening in the background. You can use the Tarantool/queue and Tarantool/sharded-queue modules for this. This example has a naive implementation of building an index layer using Tarantool/queue.

Implementing the index layer complicates data writing and updating, but significantly speeds up reading. In our case, working with the index layer when writing data looks like this:

Image description

The index layer is updated in the background after the data itself has been updated. That is, it is possible that in some cases the search will not return anything, but the data will already be saved to the database. Or, in the case of an update, the search for the old data will return new data, since the content has been updated, but the search layer has not. But eventually the data will be consistent. Depending on your case, you might need to implement a different logic: first update the index layer, and then the data.

Conclusion

Index layer:

• Significantly reduces the number of network read requests when searching for unique data.
• Allows not storing derived data for searching them.
• Helps implement quick search for any unique data.

You can download Tarantool on the official website and get help in the Telegram chat.

Top comments (0)