Ethereum has a very well defined set of methods that can be used to interact with the blockchain from external systems (probably from our current Web 2.0 systems). Ethereum exposes these functionalities over JSON-RPC 2.0 protocol. In brief, JSON-RPC is a light-weight data interchange protocol built on top of application layer protocols like HTTP and Web Sockets. Using JSON-RPC we specify the method we want to invoke and the list of parameters to be passed, the method is then executed by the Ethereum's JSON-RPC server to which we have connected and returns the output as JSON which can be consumed by the caller. When compared to gRPC and other known RPC protocols JSON-RPC is relatively simple and uses plain text JSON which is more readable and easily understandable. We can see the list of methods provided by Ethereum blockchain via JSON-RPC protocol here.
Here is a sample JSON-RPC payload to get the information of a block on the ethereum blockchain by it's number:
{
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": ["0xa0f46a", true],
"id": 1
}
We are calling the method eth_getBlockByNumber
by passing the block number 10548330
(or 0xa0f46a
in hex) and the boolean value which specifies whether to output full transaction output or just the transaction hashes. Similar to eth_getBlockByNumber
there are many methods with their own parameter requirements.
We can make HTTP POST request to the JSON-RPC node by submitting the above JSON payload. Here is an example of how it can be done with python:
import requests
import time
data = {
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": ["0xa0f46a", False],
"id": 1 # id is just a number for keeping track of the request on client-side
}
st = time.time()
response = requests.post("https://rinkeby.infura.io/v3/22b23b601d364f999c0a7cf6deb7bad4", json=data)
et = time.time()
print('time taken: ', et - st, 'seconds')
print('output', response.json())
Output:
time taken: 0.8727474212646484 seconds
output {'jsonrpc': '2.0', 'id': 1, 'result': {'baseFeePerGas': '0xb', 'difficulty': '0x2', 'extraData': '0xd683010a11846765746886676f312e3138856c696e75780000000000000000005484b299653ddb09be6dfcaf019b2d387564cd4eedc3c7a670837fca65feaab251a0a95cb1e14fbb6b7e00680d0318eb03522b7ae5ba0d6623991439b8a257a001', 'gasLimit': '0x1c9c380', 'gasUsed': '0x5c35eb', 'hash': '0x7b5bc82fae1cb1a695e4bea1da405b8b3e953d30ce63167571bb0eba538ec028', 'logsBloom': '0x0828404000000b0000044801a110a92050208002624440000c03384090200008805042185008200410000c0000001a1231408804080cbc0004000a1011363040360000000080206a41a000080022602108060204946410228604805486100000022041002200c003000412101800481080c0002000000601840100d1000802001000a30c06c100100248b00000008040c50000010240010880000044406030380200000008020408488440000580200a3504006408c000188808214e0000120048200d0228044001000200810020404408000680810800140b00110a0480e20122343c0aa290400020ac000081050891001963b0482223400040580980002403', 'miner': '0x0000000000000000000000000000000000000000', 'mixHash': '0x0000000000000000000000000000000000000000000000000000000000000000', 'nonce': '0x0000000000000000', 'number': '0xa0f46a', 'parentHash': '0xc70e1a85c2d7e865ffe71ae1e8c78539ad2d9268b469abfa06554c2c8ac71205', 'receiptsRoot': '0xdec4ee3b890662d88f6635d5ffa05c0aba25731701224735192d076d6fad019c', 'sha3Uncles': '0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347', 'size': '0x61c5', 'stateRoot': '0x9af1e2dee823147a30eff0e66c4498addbc90550aacbd65f184b1fcf13a3a162', 'timestamp': '0x62628dd8', 'totalDifficulty': '0x10a83f5', 'transactions': ['0x8fc61b71557e445c832d0c7ac0d9a19dac13b4cf92a40dd59fa27b1faf3a5781', '0x1598f0db451fd6503bcba735dbf91b785a7aeee4fba37cc83d75508ce242c354', '0xe07bcd936979f10378af2610dc5209656662754c05a501404bd757564f9b7ca8', '0xb579c46b357a344e339bbe695cbeb7d8e80cbf3a48e359c3ac68ec937c2b41bd', '0x2020771e9c08ccae2770393c0c652d5161ae6d276bf5b2509f7e38d5d008a182', '0xca805dfe9fc1e2a13cfbf0cfc6fb3be29e12d60e29deb68140293559a4388384', '0x44b67a281d17279f0cd005b7101bfbbba3300463b5a5d3123eca32e06f0b5810', '0xd990f40743486fba6ce0d187516ad49c6cd92d7520ce80447606aea69a86c841', '0x11849bac6d3d20090d8a708cd59aaaef3412a63c4c3f1b26dacef5cc999c5038', '0xfe60a91ac75fe31a8df0acc425c931c8ecaf2692519bb365bf8501def85f5dda', '0x4f4715aa0f544b8c1a2d6229dbe683d2814081d8a39609cfcb12451d3c87bb7d', '0x97e439cb02f1e8bf4f7db89e96407e67bbbc75253bf82ebb2aa3e10304376c30', '0x129021ec232712f422089a4fa2bfb7f0c5a58e19ea1bfa8e0c37ed380942e589', '0xca429b804e925e4e28a0132a4ac42b7bd19583193cbaf5da7d8ce956cae4ef5c', '0x12fc6f6277452657d730b5518016ef84281cf5961d3ed4d1a38439a6df64dca9', '0x7e733c1e4f9ce0e0f5a3d7d455ef870588844687f980386f788c378940eaa911', '0x50c86aa5756a3d8bc0282668a9e58378b1297f0be4c7a77aa26ef38aa69698fc', '0xd49611a95ba2eed4426a3942e38d3ff043eba86e171542be04b3a3a550d318d3', '0x0a92aa7a60f1d3b97d86f9665cece2bf85248ac7d5e732547cf33ee17e387f15', '0xa91e155c129b4d8449635aee1c1cbbbde7a7f8108cc45c05aa030fc34b7b16e8', '0xf7a900e2a50400b4bbcb4f6c51256cf9125347c171b107d4a86ff7d9c852b59f', '0x090add0f1d441c97b54f40d8ec54575102f66307fa03e330d44cee6bb6a1b69c', '0xad8869d19e73ee2f57e4f0d3697a58bf902ad03cea3c1c7b7cf40c9e621ffaeb', '0x022d10b70c903220475bab28d412153a47514abed92288bfbe98224dc8e7128c', '0x367cec4698f401f166864b485d21c9aedfd26f8d28d512cd15bcd7ddb4f7e86d', '0x6ba8af557b1931d5d0e095f4ca04ccc31e9a96c03596b59264c7d894814fafef', '0x1de3719a13d7141ac9db10668b29921df6dce2d3b8ee5c6307b9d19fa9b67e8a', '0x13928fe6d5af70bff7773ecf2c11528a7abcba6b8898bdd72bb68bc6d9bed414', '0x7784245cf801dfcb7c2cbd92abcee9b78e0e4fc02b617d8a2884a9f9ecff878b', '0x6a926356ea4d1198605be6cddab626b9fc5fb3f8934bcb7f0785b42fcdba1866', '0x4046f17200f5c2a316f042d5393bd555f2d05d97615122089a34972f424e676b', '0x81a4036a5e4723d93cd058b63768545444e6b29abee1cb2d9b87b05ee2310927', '0x30a1ec3718a279aec9dc10a1ff3214a2c193e89797152f7e8ab92e6728d02580', '0xb18d695cba61e4d021505bdf001c565458bd25e0f291be34311ac03dc00334e8', '0xca4d12a4b7c51ef30be26fb7489be9c6cfed25603d41f2503b38a98e342ffbfb', '0x86ca6b8b86fc54d017b116e69f188baf3d2c60fbdbb4e4617380a436a1fce767', '0x117b48440bbbdd89edf5776df2e5fed599e4ef69e5bd1405a9cac3b8264faede'], 'transactionsRoot': '0xed1e6d5dd38173c6f6fe0ec25c056a71d3e0ca9c39d1c891b63af8993526c5f5', 'uncles': []}}
This is the most simple way to get started with making JSON-RPC calls. We directly made raw RPC calls on the blockchain, instead of that we can also use full-fledged client libraries avaibale in many languages. For example web3.py is a python implementation of high-level ethereum JSON-RPC calls.
From the above output we came to know that an average web3 call via JSON-RPC takes 900ms (this depends on many factors and is different for different RPC methods) to get data of one block. Imagine we need to query hundreds of such blocks, in that case we will be wasting lot of time (approximately 90s) in our case. To mitigate this issue, we can either go for concurrent JSON-RPC calls using asynchronous networking capabilities or follow JSON-RPC batch specification.
In the following sections we will create a simple client package that can be used to make concurrent and batch JSON-RPC calls.
Preparing for concurrent and batch calls:
In the above example we used requests module to make JSON-RPC calls over HTTP, this is fine but the blocking nature of synchronous HTTP requests becomes a bottleneck when we want to scale for thousands of RPC calls. So instead of requests
module, we will use aiohttp which is built around python's asyncio capabilities. We will first create a class that abstracts away all aiohttp
stuff and provides a cleaner API to use as a python module.
class EthAioAPI:
def __init__(self):
pass
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *err):
await self.session.close()
self.session = None
This class has __aenter__
and __aexit__
methods which will be called upon entry and exit of asyncio
scope respectively, we will use these methods to manage our session object which will be created and closed automatically as we enter and leave the scope.
We want to pass in the URL to which we want to connect in the constructor. Our API should accept N tasks and execute them together as batch or concurrently. To accommodate these features, we modify the above code as follows:
class EthAioAPI:
def __init__(self, url: str, max_tasks=100):
self.url = url
self.current_tasks = []
self.current_id = 0
self.max_tasks = max_tasks
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *err):
await self.session.close()
self.session = None
def push_task(self, task: dict):
if self.current_id >= self.max_tasks:
raise Exception("maximum tasks exceeded")
payload = {
"jsonrpc": "2.0",
"method": task["method"],
"params": task["params"],
"id": self.current_id
}
self.current_tasks.append(payload)
self.current_id += 1
def set_max_tasks(self, n: int):
self.max_tasks = n
We added few members in the constructor, the url
stores the URL string of the gateway RPC node to which we want to connect, current_tasks
holds the list of tasks submitted via push_task
method and current_id
keeps track of a number that is just incremented every time to assign a new ID for the task. max_tasks
represents how many tasks we can execute together, as you can see a check is made in push_task
method before allowing the task to be added to the list, we can use set_max_tasks
anytime to change this limit. Now that we have necessary structures to hold our tasks, we can go ahead and build functions to execute these tasks together - we can do this in two ways, i.e either concurrently or as a batch.
Making concurrent (asynchronous) requests:
aiohttp
provides us capabilities to execute tasks concurrently, using the asynchronous system calls provided by the operating system, in other words, unlike requests
module, here each call is executed in a non-blocking way without blocking our main thread, thus we can schedule N requests at a time to run in background and await
for results. To achieve this, we create an asynchronous function called tasklet
which makes a single JSON-RPC call without blocking. To run N concurrent requests we call tasklet
N times and they are executed concurrently. Here is our tasklet
function:
async def tasklet(id: int, url: str, session: aiohttp.ClientSession, payload: dict) -> dict:
try:
response = await session.post(
url, json=payload,
headers={"Content-Type": "application/json"}
)
json_resp = await response.json()
json_resp['success'] = True
return json_resp
except Exception as e:
return {"success": False, "exception": e, "id": id}
The tasklet
function makes a web3 call, if there is a failure returns the failure information or it returns the response dictionary by adding a new entry "success": True
to the response. Finally we create a class method exec_tasks_async
which calls a tasklet
for each task submitted using push_task
method.
async def exec_tasks_async(self):
fns = []
for id, task_payload in enumerate(self.current_tasks):
task_fn = EthAioAPI.tasklet(
id, self.url, self.session, task_payload)
fns.append(task_fn)
outputs = await asyncio.gather(*fns, return_exceptions=True)
self.current_tasks.clear()
self.current_id = 0
return outputs
This method pushes tasklet
function for each task to the event loop and calls them together using asyncio.gather()
and gathers the outputs, the outputs are returned in the same order in which the tasks are submitted. Once we receive the outputs we reset our task list so new tasks can be submitted next time.
This method allows us to call N JSON-RPCs at a time without waiting for individual calls to complete. However, in this approach we are using the resources on the client side to open N concurrent sockets and also we end up making lot of Web3 calls which might sometimes get rate-limited.
Making batch request:
As per JSON-RPC spec, we can send a list of RPC calls at a time and expect the results for all the calls from the server at once. This is left to the implementors of the JSON-RPC server, they can choose to implement this functionality or choose to ignore it, however few Ethereum JSON-RPC implementations supports batched calls. We will now add exec_tasks_batch
class method to add Batch calls support in our client:
async def exec_tasks_batch(self):
try:
response = await self.session.post(
self.url, json=self.current_tasks, headers={
"Content-Type": "application/json"}
)
json_resp = await response.json()
self.current_tasks.clear()
self.current_id = 0
return json_resp
except Exception as e:
raise e
As you can see, this method just submits the entire task list as it is and the output we get is a list of responses for each call in the list.
In this method, we will be using the resources of the RPC server to execute N tasks at a time for us, in client side we need only one socket and we open only one connection. (Here we are depending on the server. So make sure the RPC server you are connecting to supports this functionality).
Final code:
import asyncio
import aiohttp
class EthAioAPI:
def __init__(self, url: str, max_tasks=100):
self.url = url
self.current_tasks = []
self.current_id = 0
self.max_tasks = max_tasks
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *err):
await self.session.close()
self.session = None
async def tasklet(
id: int,
url: str,
session: aiohttp.ClientSession,
payload: dict
) -> dict:
try:
response = await session.post(
url, json=payload,
headers={"Content-Type": "application/json"}
)
json_resp = await response.json()
json_resp['success'] = True
return json_resp
except Exception as e:
return {"success": False, "exception": e, "id": id}
def push_task(self, task: dict):
if self.current_id >= self.max_tasks:
raise Exception("maximum tasks exceeded")
payload = {
"jsonrpc": "2.0",
"method": task["method"],
"params": task["params"],
"id": self.current_id
}
self.current_tasks.append(payload)
self.current_id += 1
async def exec_tasks_batch(self):
try:
response = await self.session.post(
self.url, json=self.current_tasks, headers={
"Content-Type": "application/json"}
)
json_resp = await response.json()
self.current_tasks.clear()
self.current_id = 0
return json_resp
except Exception as e:
raise e
async def exec_tasks_async(self):
fns = []
for id, task_payload in enumerate(self.current_tasks):
task_fn = EthAioAPI.tasklet(
id, self.url, self.session, task_payload)
fns.append(task_fn)
outputs = await asyncio.gather(*fns, return_exceptions=True)
self.current_tasks.clear()
self.current_id = 0
return outputs
def set_max_tasks(self, n: int):
self.max_tasks = n
We can create a folder called aio_eth
and copy this code as __init__.py
which allows us to import aio_eth
and use it as a module.
Let's test!
We can use the class we built in the previous sections to test and verify the functionalities, First let's execute a sample JSON-RPC call N times concurrently and check how it works:
import asyncio
# our module
import aio_eth
import time
URL = "https://rinkeby.infura.io/v3/b6fe23ef7add48d18d33c9bf41d5ad0c"
async def query_blocks():
async with aio_eth.EthAioAPI(URL, max_tasks=100) as api:
for i in range(10553978, 10553978 + 70):
api.push_task({
"method": "eth_getBlockByNumber",
"params": [
hex(i), True
]
})
st = time.time()
_ = await api.exec_tasks_async()
et = time.time()
print('time taken: ', et - st, ' seconds')
if __name__ == "__main__":
loop = asyncio.get_event_loop()
result = loop.run_until_complete(query_blocks())
Output:
time taken: 1.5487761497497559 seconds
So it takes us around 1.5 seconds to execute 70 RPC calls asynchronously. Now let's try batch JSON-RPC call:
import asyncio
import aio_eth
import time
URL = "https://rinkeby.infura.io/v3/b6fe23ef7add48d18d33c9bf41d5ad0c"
async def query_blocks():
async with aio_eth.EthAioAPI(URL, max_tasks=100) as api:
for i in range(10553978, 10553978 + 70):
api.push_task({
"method": "eth_getBlockByNumber",
"params": [
hex(i), True
]
})
st = time.time()
_ = await api.exec_tasks_batch()
et = time.time()
print('time taken: ', et - st, ' seconds')
if __name__ == "__main__":
loop = asyncio.get_event_loop()
result = loop.run_until_complete(query_blocks())
Output:
time taken: 3.698002576828003 seconds
To achieve same result using Batch JSON-RPC call it takes us 3.7 seconds, which is twice as that of concurrent JSON-RPC calls. However, this varies across different methods and amount of data the server returns over the network. To achieve the same in sequential way, it would take us around 65-70 seconds, thus using either concurrency or batch mode we can save lot of time and hence we can scale our client to make lot of JSON-RPC calls in less time.
Getting aio_eth package from PyPi
I have published the above code as a PyPi package so people can download and use it. The PyPi package can be found here. We can also use pip
to install this package:
pip install aio-eth
The package is open source and can be found on Github.
Top comments (0)