Author: Satbek Turganbayev
Hello, my name is Satbek, and I'm part of the Tarantool team. I will tell you how to implement a search in a sharded cluster, the speed of which does not depend on the number of masters and the amount of stored data. I call this method an index layer:
• I will describe the general process of developing a search algorithm.
• I will give an example of implementation,
• And I will also give some recommendations for its development.
In the article, I consistently implement a simple CRUD service with sharded storage, as well as a data search. This will help you better understand the problems arising with distributed searches and how the index layer solves them.
We will use the Tarantool database (version ≥ 1.10), as well as the Tarantool-Cartridge clustering framework (version 2.7.0).
For a better understanding, you should learn a little about the Tarantool-Cartidge framework, the vshard module, and the Lua language, since the example is written in it.
General description of the application
As an example, let's implement a simple CRUD service with one user table sharded by id. Tarantool uses the term space. You can find the source code for the application here.
The required data will be stored in the user space in the following format:
|id(uuid)|bucket_id(number)|name(string)|birthdate(number)|phone_number(string)|
The users are sharded by id. There is a primary index by id and secondary index by name.
How do we search in such a cluster?
Search by id
-- router side
local bucket_id = vshard.router.bucket_id(user_id)
user = vshard.router.callrw(bucket_id, 'get_user_by_id', {user_id}, {timeout = 1})
-- storage side
local M = {}
function M.get_user_by_id(id)
local user_t = get_user_tuple_by_id(id)
if user_t == nil then
return nil, M.user_not_exists_err:new('id=%s', id)
end
return user_tuple_to_output(user_t)
end
To search by id, we use the standard Tarantool approach. We calculate the bucket_id on the router side to find out which storage node our data is on, and then send a request there. As a result, we get what we are looking for in no more than two queries.
Search by name
Next, we implement a search by name. Here, one request is not enough: we need to access each master storage in our cluster, since we don’t know where our data is.
-- router side
local M = {}
function M.find_users_by_name(name)
local storage_uris = cartridge_rpc.get_candidates('app.roles.storage', {leader_only = true})
local res_by_uri, err = cartridge_pool.map_call('storage_api.get_users_by_name', {name}, {uri_list = storage_uris, timeout = M.vshard_timeout})
if err ~= nil then
return nil, err
end
local result = {}
for _, res in pairs(res_by_uri) do
for _, user in pairs(res) do
table.insert(result, user)
end
end
return result
end
-- storage side
local M = {}
function M.get_users_by_name(user_name)
local yield_every = 100
local count = 1
local result = {}
for _, t in box.space.user.index.name:pairs({user_name}, 'EQ') do
if count % yield_every == 0 then
fiber.yield()
end
count = count + 1
table.insert(result, user_tuple_to_output(t))
end
return result
end
In this case, the number of requests will be no less than the number of masters in the cluster. However, this approach is acceptable if it is known that the required data lies on several masters and if there are few masters. Otherwise, an additional index for the name field is required to search efficiently. In the context of searching by person's name, the assumptions described above are valid if there is sufficient amount of data.
Widening the search
Now let's say you want to search for a user by phone number as well. To do this, you can use the search by name example we described above: make an additional index for storage and implement MapReduce for search. This algorithm will work and it's relatively easy to implement, especially with the help of the Tarantool/crud module. However, it has a significant drawback: the “phone number” field, as a rule, is only in one user entry. So, the entry with the desired value will be located on only one master. But in the MapReduce algorithm, we will have to visit all the masters in the cluster, and if you have two of them, you will have to make two read requests, if 10 — then 10 requests, etc. And all of these will be network requests, which are very expensive compared to reading by index, and they can fail. This leads to a new problem — redundant network requests when searching for unique fields.
Index layer structure
The difficulty here is that we need to find unique data, but we do not know which master it is stored on. So, we need to store this information. We must use up memory to reduce network requests.
Let's create a special user_search_index space:
|user_id(string-uuid)|bucket_id(unsigned)|data_hash(string)|data(any)|
This space will be the search layer. When writing data, we will calculate data_hash of the information used for search:
-- one of the many options for doing this
local msgpack = require('msgpack')
local digest = require('digest')
local data = {'phone-number', '202-555-0165'}
local data_hash = digest.md5_hex(msgpack.encode(data))
The user_search_index space is sharded by data_hash. In order to avoid collisions, you need to store data. Here you can find the application module that implements the hash building logic:
local M = {}
local digest = require('digest')
local msgpack = require('msgpack')
M.cmp_data = {}
local Identifier = {}
function Identifier:new(data)
local obj = {}
obj.hash = digest.md5_hex(msgpack.encode(data))
obj.data = data
return obj
end
function M.phone_number(phone_number)
local data = {'phone_number', phone_number}
return Identifier:new(data)
end
function M.cmp_data.phone_number(data_one, data_two)
return #data_one == 2 and #data_two == 2 and (data_one[1] == data_two[1]) and (data_one[2] == data_two[2] ~= nil)
end
return M
Read
While searching, we:
• calculate the hash of the search data;
• use it to find data in user_search_index;
• send a request to storage and get the data by primary-key.
Example code for searching by phone number:
-- api.lua
local M = {}
function M.find_users_by_phone_number(phone_number)
local user_ids, err = M.search_index.user_id.get_by_phone_number(phone_number)
if err ~= nil then
err = errors.wrap(err)
return nil, err
end
return get_users_by_ids(user_ids)
end
-- search_api.lua
local M = {}
function M.user_id.get_by_phone_number(phone_number)
local identifier = M.identifier.phone_number(phone_number)
local bucket_id = M.vshard_router.bucket_id_mpcrc32(identifier.hash)
local ids, err = M.vshard_router.callrw(bucket_id, 'search_storage_api.user_id.get_by_phone_number',
{identifier.hash, identifier.data}, {timeout = M.vshard_timeout}
)
if err ~= nil then
err = errors.wrap(err)
return nil, err
end
return ids
end
-- search_storage_api.lua
local M = {}
local function get_users_by_hash(hash, data, cmp_func)
local result = {}
for _, t in box.space.user_search_index.index.hash:pairs({hash}, 'EQ') do
if t.data_hash ~= hash then
break
end
if cmp_func(t.data, data) then
result[#result + 1] = t.user_id
end
end
if #result == 0 then
return nil, not_found_err:new()
end
return result
end
function M.user_id.get_by_phone_number(hash, data)
return get_users_by_hash(hash, data, M.cmp_data.phone_number)
end
Here we managed without MapReduce. In the case of unique data, we make one network request to find the primary_key, and then another one to get the desired data.
Indexing arbitrary data
The index layer allows you to move indexing from the database level to the application code level. This opens up great possibilities for indexing arbitrary data. For example, it is quite easy to make a composite index based on name and date of birth.
-- identifier.lua
local M = {}
local digest = require('digest')
local msgpack = require('msgpack')
M.cmp_data = {}
local Identifier = {}
function Identifier:new(data)
local obj = {}
obj.hash = digest.md5_hex(msgpack.encode(data))
obj.data = data
return obj
end
...
function M.name_birthdate(name, birthdate)
local data = {'name', name, 'birthdate', birthdate}
return Identifier:new(data)
end
function M.cmp_data.name_birthdate(data_one, data_two)
return #data_one == 4 and #data_two == 4 and
(data_one[1] == data_two[1]) and
(data_one[2] == data_two[2] ~= nil) and
(data_one[3] == data_two[3] ~= nil) and
(data_one[4] == data_two[4] ~= nil)
end
return M
The rest is identical to searching by phone number.
Write
Earlier, I described how to implement search using the index layer. Now let's look at how it is built. When you create or update data, you must also create or update the index layer. The implementation of these processes depends on the specific task, but most likely it will be happening in the background. You can use the Tarantool/queue and Tarantool/sharded-queue modules for this. This example has a naive implementation of building an index layer using Tarantool/queue.
Implementing the index layer complicates data writing and updating, but significantly speeds up reading. In our case, working with the index layer when writing data looks like this:
The index layer is updated in the background after the data itself has been updated. That is, it is possible that in some cases the search will not return anything, but the data will already be saved to the database. Or, in the case of an update, the search for the old data will return new data, since the content has been updated, but the search layer has not. But eventually the data will be consistent. Depending on your case, you might need to implement a different logic: first update the index layer, and then the data.
Conclusion
Index layer:
• Significantly reduces the number of network read requests when searching for unique data.
• Allows not storing derived data for searching them.
• Helps implement quick search for any unique data.
You can download Tarantool on the official website and get help in the Telegram chat.
Top comments (0)