Buy Percona ServicesBuy Now!

How to build a high-performance application on Tarantool from scratch

 | October 30, 2020 |  Posted In: Advanced Level, DevOps, Open Source Databases, Tools

tarantool start

I came to Mail.ru Group in 2013, and I required a queue for one task. First of all, I decided to check what the company had already got. They told me they had this Tarantool product, and I checked how it worked and decided that adding a queue broker to it could work perfectly well.

I contacted Kostja Osipov, the senior expert in Tarantool, and the next day he gave me a 250-string script that was capable of managing almost everything I needed. Since that moment, I have been in love with Tarantool. It turned out that a small amount of code written with a quite simple script language was capable of ensuring some totally new performance for this DBMS.

Today, I’m going to tell you how to instantiate your own queue in Tarantool 2.2.

At that moment, I enjoyed a simple and fast queue broker Beanstalkd. It offered a user-friendly interface, task status tracking by connection (client’s disconnection returned the task into the queue), as well as practical opportunities for dealing with delayed tasks. I wanted to make something like that.

Here is how the service works: there is a queue broker that accepts and stores tasks; there are clients: producers sending tasks (put method); and consumers taking tasks up (take method).

tarantool 1

This is how one task’s lifecycle looks like. The task is sent with the put method and then goes to the ready state. The take operation changes the task’s status to taken. The taken task can be acknowledged (ack) and removed or changed back to ready (release-d).

tarantool 2

Procession of delayed tasks can be added:

tarantool 3

Neighborhood setup

Today, Tarantool is also a LuaJIT interpreter. To start working with it, an entry point is required – an initial file init.lua. After that a box.cfg()shall be called – it starts the DBMS internals.

For local development, you only have to connect and start the console. Then create and run a file as follows:

The console is interactive and can be instantly put to use. There is no need to install and/or set up multiple tools and learn to use them. It only takes to write 10 to 15 strings of code on any local machine.

Another advice from me is to use the strict mode. Lua language is quite easy on the variable declaration, and this mode to a certain extent will help you with error management. If you build Tarantool on your own in the DEBUG mode, the strict mode will be on by default.

Let’s run our file with Tarantool:

You’ll see something like:

Writing a queue

Let’s create a file queue.lua to write our app. We can add all of it right to init.lua, but working with an independent file is handier.

Now, connect the queue as a module from the init.lua file:

All the following modifications will be made in queue.lua.

As we’re making a queue, we need a place to store the task data. Let’s create a space – a data table. It can be made optionless, but we’re going to add something at once. For regular restart we have to indicate that a space shall be created only in case it doesn’t exist (if_not_exists). Another thing – in Tarantool, you can indicate the field format with content description (and it is a good idea to do so).

I’m going to take a very simple structure for the queue. I’ll need only task id-s, their statuses, and some random data. Data can’t be used with a primary index, so we create an index in accordance with id. Make sure the field type of the format and the index match.

Then, we make a global queue table that will contain our functions, attributes, and methods. First of all, we bring out two functions: putting a task (put) and taking a task (take).

The queue will show states of the tasks. For status indication, we’ll make another table. Numbers or strings can be used as values, but I like one-symbol references – they can be semantically relevant, and they take little place to be stored. First of all, we create two statuses: R=READY and T=TAKEN.

How do we make put? Easy as pie. We need to generate an id and insert the data to a space with the READY status. There are many ways to generate an indicator, but we’ll take clock.realtime. It can automatically determine the message queue. However, remember that the clock is likely to readjust, causing a wrong message order. Another thing is that a task with the same value can appear in the queue. You can check if there is a task with the same id, and in case of collision you just one unit. This takes microseconds, and this situation is highly unlikely, so efficiency won’t be affected.

All the arguments of the function shall be added to our task:

After we’ve written the put function, we can restart Tarantool and call this function instantly. The task will be added to the queue, now looking like a tuple. We can add random data and even nested structures to it. Tuples that Tarantool uses to store data are packed into the MessagePack, which facilitates storing of these structures.

Everything we put remains within the space. We can take space commands to see what we have there:

Now, we need to learn how to take tasks. For this, we make a take function. We take the tasks that are ready for processing, i. e., the ones with the READY status. We can check the primary key and find the first ready task, but if there’re a lot of tasks to be processed this scenario won’t work. We’ll need a special index using the status field. One of the main differences between Tarantool and the key-value databases is that the former facilitates the creation of diverse indexes, almost like in relational databases: using various fields, composite ones, of different kinds.

Then, we create the second index, indicating that the first field shows status. This will be our search option. The second field is id. It will put the tasks with the same status in the ascending order.

Let’s take predefined functions for our selection. There’s a special iterator that is applied to a space as pairs. We pass a part of the key to it. Here, we have to deal with a composite index, which contains two fields. We use the first one for searching and the second one for putting things in order. We command the system to find the tuples that match the READY status in the first part of their index. And the system will present them put in order in accordance with the second part of the index. If we find anything, we’ll take that task, update it and return it. An update is required to prevent anybody with the same take call taking it. If there are no tasks, we return nil.

Please, note that the first tuple level in Tarantool is an array. It has no names, but only numbers, and that’s why the field number used to be required at operations like update. Let’s make an auxiliary element – a table, to match the field names and numbers. To compile such a table we can use the format we’ve already written:

For better visibility, we can correct descriptions of indexes like:

Now we can implement take in whole:

Let’s check how it works. For this, we’ll put one task and call take twice. If by that moment we have any data in the space, we can clear it with the command box.space.queue:truncate():

The first take will return us the task we’ve put. As soon as we call take for the second time, nil is returned, because there are no more ready-tasks (with R status). To make sure, we run a select command from the space:

The consumer taking the task shall either acknowledge its procession or release it without a procession. In the latter case, somebody else will be able to take the task. For this, two functions are used: ack and release. They receive the task’s id and look for it. If the task’s status shows it’s been taken, we process it. These functions are really similar: one removes processed tasks, and the other returns them with a ready status.

Let’s see how it works with all four functions. We will put two tasks and take the first of them, then releasing them. It returns to the R status. The second take call takes the same task. If we process it, it will be removed. The third take call will take the second task. The order will be observed. In case the task has been taken, it won’t be available for anybody else.

This is a properly working queue. We are already capable of writing a consumer to process the tasks. However, there is a problem. When we call take, the function instantly returns either a task or an empty string. If we write a cycle for task procession and start it, it will run unproductively, doing nothing and simply wasting the CPU.

To fix this, we’ll need a primitive channel. It enables message communication. In fact, it’s a FIFO queue for fiber communication. We have a fiber that puts the tasks when we access the database through the network or via the console. At the fiber, our Lua-code is executed, and it needs some primitive to inform the other fiber awaiting the task that there is a new one available.

This is how a channel works: it can contain a buffer with N slots where a message can be located, even if no one is checking the channel. Another option is creating a channel without a buffer: this way, messages will be only acceptable in the slots that somebody waiting for. Let’s say, we create a channel for two buffer elements. It has two slots for put. If one consumer is waiting at the channel, it will create a third slot for put. If we are going to send messages via this channel, three put operations will be enabled without blocking, but the fourth put operation will be blocked by the fiber that sends messages via this channel. This is how an inter-fiber communication is set up. If for any chance you are familiar with channels in Go, they are literally the same there:

tarantool 4

Let’s slightly modify the take function. First of all, we add a new argument – timeout, implying we’re ready to wait for the task within a set period of time. We make a cycle to search for a ready task. If it can’t be found, the cycle will compute how long it has to wait.

Now, let’s make a channel that will wait along with this timeout. While the fiber is pending at the channel (asleep), it can be woken up externally by sending a message via the channel.

Altogether, take tries to take the task and if this is managed, the task is returned. However, if there is no task, it can be awaited for the rest of the timeout. Besides, the other party that creates the task will be able to wake this fiber up.

To make the performance of various tests more convenient, we can globally connect the fiber module in the init.lua file:

Let’s see how this works without waking the fiber up. In an independent fiber, we’ll put a task with a 0.1 sec. delay, i. e. at first the queue will be empty, and the task will appear in 0.1 sec. after starting. Upon that, we’ll set up a 3 sec. timeout for the take call. After the start, take will try to find the task, and then if there’s none, it goes to sleep for 3 sec. In 3 sec. it wakes up, searches again, and finds the task.

Now, let’s make take wake up at the tasks’ appearance. For this, we’ll take our old put function and update it with a message sent via the channel. The message can be literally anything. Let it be true here.

Previously, I demonstrated that put can be blocked if the channel lacks place. At the same time, the task producer doesn’t care if there are consumers on the other side. It shouldn’t get blocked while waiting for a consumer. So, it’s a reasonable thing to set up a zero timeout for blocking here. If there are some consumers out there, i. e., the ones who need to be messaged about the new task, we’ll wake them up. Otherwise, we won’t be able to send the message via the channel. An alternative option is to check if the channel has any active readers.

Now the take code is going to work in a totally different way. We create a task in 0.1 sec. and take instantly wakes up and receives it. We’ve got rid of the hot cycle that has been continuously pending, awaiting tasks. If we don’t put a task, the fiber will wait for 3 seconds.

We’ve tested how things work within the instance, and now let’s try some networking. First of all, let’s create a server. For that, we add the listen option to box.cfg of our init.lua file (it will be a port used for listening). At the same time, we’ll need to give permissions. Right now we’re not going to study privilege setting up in detail, but let’s make every connection have an execution privilege. To read about the rights please check this.

Let’s create a producer client for task generation. Tarantool already has a module that facilitates connection to another Tarantool.

The consumer will connect, call take with a timeout, and process the result. If it receives the task, we’ll print or release it but won’t process it yet. Let’s say, we’ve received the task.

But when we try to release the task, something odd happens:

Let’s delve into this matter. When the consumer will once again attempt to execute the task we’ll see that at the previous start it has taken the task but hasn’t been able to return it. Some error’s occurred, and the tasks got stuck. Such tasks become unavailable for other consumers, and there is nobody to return them to, as the code used to take them has been completed.

select shows that the tasks have been taken.

We have several issues at once here. Let’s start with the automatic release of the tasks in case the client disconnects.

Tarantool contains triggers for client connection and disconnection. If we add them, we’ll be able to learn about connection and disconnection events.

There is this term, session id, and we can check the IP address used to connect, as well as the time of disconnection. However, calling session.peer()actually calls getpeername(2) right over the socket. That’s why at disconnection we don’t see who’s disconnected (as getpeername is called over a closed socket). Let’s do some minor hacking, then. Tarantool has a box.session.storage — a temporary table, to which anything you wish can be saved during the session lifetime. At connection, we can keep in mind the ones connected to know who’s disconnected. This will make adjustments easier.

So, we have a client disconnection event. And we need to somehow release the tasks it has taken. Let’s introduce the term “possession of the task.” The session that has taken the task ought to answer for it. Let’s make two tables to save these data and modify the take function:

We’ll use this table to memorize that a certain task has been taken by a certain session. We’ll also need to modify the task returning code, ack, and release. Let’s make a single common function to check if the task is there and if it has been taken by a specific session. Then, it will be impossible to take the task under one connection and then return under another one requesting its deletion due to its procession completion.

Now ack and release functions become very simple. We use them to call get_task, which checks if the task is possessed by us and if it is taken. Then we can work with it.

To reset statuses of all the tasks to R SQL or Lua snippet can be used:

When we call the consumer again, it replies: task ID required.

Thus, we’ve found the first problem in our code. When we work in Tarantool, a tuple is always associated with the space. The latter has a format, and the format has field names. That’s why we can use field names in a tuple. When we take it beyond the database, a tuple becomes just an array with a number of fields. If we refine the format of return from the function, we’ll be able to return not tuples, but objects with names. For this, we’ll apply the method :tomap{ names_only = true }:

Having replaced it, we’ll encounter another issue.

If we try to release the task the system will answer that we haven’t taken it. Moreover, we will see the same ID, but with a suffix – ULL.

Here we encounter a trick of the LuaJIT extention: FFI (Foreign Function Interface). Let’s delve into this matter. We add five values to the table using various alternatives of numeral 1 designation as the keys.

We would expect them to be displayed as 2 (string + number) or 3 (string + number + LL). But when displayed, all the keys will appear in the table separately: we will still see all the values – 1, 2, 3, 4, 5. Moreover, at serialization, we won’t see any difference between regular, signed, or unsigned numbers.

However, the most amusing thing happens when we try to extract data from the table. It goes well with regular Lua-types (number and string), but it doesn’t with LL (long long) and ULL (unsigned long long). They are a separate type of cdata, intended for working with C language types. When saving into a Lua table, cdata is hashed by address, not by value. Two numbers, no matter if they are the same in value, simply have different addresses. And when we add ULL to the table, we can’t extract them using the same value.

That’s why we’ll have to change our queue and key possession a bit. This is a forced move, but it will enable random modification of our keys in the future. Somehow, we need to transform our key into a string or a number. Let’s take the MessagePack. In Tarantool, it’s used to store tuples, and it will pack our values just like Tarantool itself does. With this pack, we’ll transform a random key into a string that will become a key to our table.

Then we add the key package to take and save it into the table. In the function get_task we need to check if the key has passed in a correct format, and if not, we change it to int64. After that, we use keypack to pack the key to the MessagePack. As this packed key will be required by all the functions that use it, we’ll return it from get_task, so that ack and release could use it and clean it out from the sessions.

As we have a disconnection trigger, we know that a certain session has disconnected – the one possessing certain keys. We can take all the keys from that session and automatically return them to their initial state — ready. Besides, this session may contain some tasks awaiting to be take-n. Let’s mark them in the session.storage for the tasks not to be taken.

For testing purposes, tasks can be taken as a group:

At adjustment, you might see that you’ve taken the tasks, thus dropping the queue, but at restart the tasks aren’t possessed by anybody (because all connections were interrupted when you switched off), but they get the taken status. That’s why we’ll update our code with status modification at startup. Thus, the database will be started, releasing all the tasks taken.

Now we have a queue ready for operation.

Adding delayed procession

Thereat, we only have to add delayed tasks. For that, let’s add a new field and a relevant index. We’re going to use this field to store the time when a certain task’s state should be changed. To this end, we’re modifying the put function and adding a new status: W=WAITING.

As we are flip-flopping the pattern, and as this is a development mode, let’s clear the previous pattern (via the console):

Now, let’s restart our queue.

Then, we add support of delay in put and release. If delay is passed on, the task’s status shall be changed to WAITING, and we have to define when it is subject to the procession. Another thing we need is a processor. For this, we can use background fibers. At any moment we can create a fiber that isn’t associated with any connections and works in the background. Let’s make a fiber that will work infinitely and await the nearest tasks.

If a time comes for some of the tasks, we modify it, changing its status from waiting to ready and also notifying the clients that might be awaiting a task.

Now, we put a delayed task. Call take, make sure that there are no ready tasks. Call it again with a timeout, which fits into the task’s appearance. As soon as it appears, we see it’s been done by the fiber queue.runat.

Monitoring

Never forget about monitoring the queue, because it can extend too much or even run out. We can count the number of tasks with every status in the queue and start sending the data to monitoring.

Such monitoring will work quite fast as long as there are not too many tasks. The normal state of the queue is empty. But suppose we have a million tasks. Our stats function still shows the correct value but works rather slowly. The issue is caused by the index:count call — this is always a full scan by index. Let’s cash the values of the counters.

Now, this function will work very fast regardless of the number of records. We only have to update the counters at any operations. Prior to every operation, we have to reduce one value and increase the other. We also can manually set updates of the functions, but errors and contradictions are possible. Luckily, Tarantool has triggers for spaces that are capable of tracing any changes in the space. You can even manually execute space:update or space:delete – the trigger will take that into account, too. The trigger will account for all the statuses according to the value used in the database. At the restart, we’ll once account for the values of all the counters.

There is one more operation that can’t be traced in the space directly but affects its content: space:truncate(). To monitor the clearing of the space a special space trigger can be used — _truncate.

After that everything will work accurately and consistently. Statistics can be sent over the network. Tarantool has convenient non-blocking sockets that can be used rather low-level, almost like in C.

To demonstrate how it works let send the metrics in a Graphite format using UDP:

or using TCP:

Hot code reloading

An important feature of the Tarantool platform is hot code reloading. It is rarely required in regular apps, but when you have Gigabytes of data stored in the database and every reload takes time, hot reloading is quite helpful. 

When Lua loads some code on require, the content of the file is interpreted, and the returned result is cashed in the system table package.loaded under the module’s name. Subsequent require calls of the same module won’t read the file again but will return its cached value. To make Lua reinterpret and redownload the file you just have to delete the relevant record from package.loaded[…] and call require again. You need to memorize what the runtime has preloaded because there won’t be files for reloading of inbuilt modules. The simplest code snippet for reload procession looks like:

As the code reload is a typical and regular task, we already have a set module package.reload, which we use in most of the apps. It memorizes the file used to download data, the modules that were preloaded, and then provides a convenient call for reload initiation: package.reload().

To make the code reloadable, you should write it in a slightly different way. Mind that the code can be executed repeatedly. At first, it is executed at the first start, and subsequently, it is executed at reloading. We have to clearly process this situation.

Besides, you need to remember about trigger reloading. If you leave the issue as is, every reload will cause the installation of an additional trigger. However, triggers support an indication of the old function, so the installation of the trigger returns it. That’s why we’ll just save the installation result to a variable and pass it on as an argument. At the first start, there will be no variable, and a new trigger will be installed. However, at subsequent loading, the trigger will be replaced.

Another essential element at reloading is fibers. A fiber is started in the background, and we don’t control it in any way. It has whiletrue written, and it never stops and doesn’t reload on its own. To communicate with it we’ll need a channel or rather a fiber.cond: condition variable.

There are several different approaches to fiber reload. For example, the old ones can be deleted with the fiber.kill call, but this is not a very consistent take on the issue, as we may call kill at the wrong time. This is why we usually use the fiber generation attribute: the fiber proceeds working only in the generation it has been created. At code reload, the generation changes and the fiber clearly ends. Moreover, we can prevent the simultaneous operation of several fibers, checking the status of the previous generation fiber.

And in the end, at code reload you get an error saying the console is already on. This is how this situation can be dealt with:

Let’s summarize

We’ve written a working network queue with delayed processing, automatic task return by means of triggers, statistics forwarding in Graphite using TCP, and explored quite a few issues. With average state-of-the-art hardware, such a queue will easily support the transmission of 20+ thousand messages per second. It contains about 300 code strings and can be compiled in a day, document studies included.

Final files:

queue.lua:

init.lua:

 

Mons Anderson

Architect of the S3 compatible object storage by Mail.ru Cloud Solutions. Technical product manager at Tarantool. 20+ years of experience in programming, 8+ years in high-performance applications based on Tarantool.

Leave a Reply