Python Networking On Steroids
Asynchronicity, Concurrency And Clarity
This is a presentation of one way to write advanced networking software in Python. It tackles difficult topics such as asynchronicity and concurrency. At the same time, code clarity remains a primary motivation. There is a series of code modules intended as illustration of different points, with the final module offered as a starting point for your next ambitious networking project.
The first part of this presentation is maintained as a separate document that can be found here. This is because there is standalone value in presentation of a clear, concise HTTP API server that is unencumbered with the more advanced issues tackled here. Working through that presentation will introduce relevant background information on the adopted toolset (i.e. Ansar) and will also result in a running network server (i.e. the crunch_server
). The latter is used as a test server in this presentation.
Ansar-based applications are supported on several distributions of Linux, macOS 14 and Windows 11. Process orchestration is also supported through the
ansar
CLI. Due to technical differences in these platforms, the latter is not supported on Windows
Familiarity with terms such as asynchronous, event-driven, FSM, RPC, dispatching, handlers, callbacks, concurrency and distributed transaction will help when reading this presentation.
Starting With A Plain Client
A good start is to look at a plain, HTTP request-response interaction with a server. Assuming the crunch_server
is currently listening at the address 127.0.0.1:5051
, copy the following code into crunch_client.py
alongside the server file. This is for access to the definition module, crunch_api
that appeared in the separate presentation;
import ansar.connect as ar
from crunch_api import *
# The client object.
class Client(ar.Point, ar.Stateless):
def __init__(self, settings):
ar.Point.__init__(self)
ar.Stateless.__init__(self)
self.settings = settings
def Client_Start(self, message): # Start the networking.
host = self.settings.host
port = self.settings.port
ipp = ar.HostPort(host, port)
ar.connect(self, ipp, api_client='/', ansar_server=True)
def Client_Connected(self, message):
settings = self.settings
request = settings.request(settings.x, settings.y)
def completed_1(response): # What to do when transaction completes.
self.complete(response) # Terminate with the response as the output.
a = self.create(ar.GetResponse, request, self.return_address)
self.assign(a, ar.OnCompleted(completed_1))
def Client_Completed(self, message):
d = self.debrief(self.return_address)
if isinstance(d, ar.OnCompleted):
d(message.value)
def Client_NotConnected(self, message): # No networking.
self.complete(message)
def Client_Stop(self, message): # Control-c or software interrupt.
self.abort(ar.Aborted())
# Declare the messages expected by the server object.
CLIENT_DISPATCH = [
ar.Start, # Initiate networking.
ar.Connected, # Ready to send.
ar.Completed,
ar.NotConnected, # Networking failed.
ar.Stop, # Ctrl-c or programmed interrupt.
]
ar.bind(Client, CLIENT_DISPATCH)
# Configuration for this executable.
class Settings(object):
def __init__(self, host=None, port=None, request=None, x=None, y=None):
self.host = host
self.port = port
self.request = request
self.x = x
self.y = y
SETTINGS_SCHEMA = {
'host': str,
'port': int,
'request': ar.Type,
'x': float,
'y': float,
}
ar.bind(Settings, object_schema=SETTINGS_SCHEMA)
# Define default configuration and start the server.
factory_settings = Settings(host='127.0.0.1', port=5051, request=Multiply, x=1.5, y=2.25)
if __name__ == '__main__':
ar.create_object(Client, factory_settings=factory_settings)
To run the client, use the following commands;
$ cd <crunch_server folder>
$ source .env/bin/activate
$ python3 crunch_client.py --debug-level=DEBUG --x=6.0 --y=7.0
Output(value=42.0)
Request values are accepted on the command line using the --member=value
notation. The request is successfully submitted and the response printed in a human-readable form.
The crunch_client
is structured in a fashion similar to the crunch_server
. The technical reasons for this are covered in the description of the crunch_server
. In brief this involves creation of a main, asynchronous object — in this case an instance of Client
— using ar.create_object
. The Ansar library then supplies a Start
notification which the client uses to initiate a network connection. Ansar also supplies the subsequent ar.Connected
notification which the client assumes is a good moment to start the request-response exchange.
Arguments passed to the ar.connect
function arrange for specific handling of messages passing over the network connection;
api_client
— enable use of the HTTP protocol and set the URI prefix,ansar_server
— enable the auto-decode of the HTTP response body.
Sending The Request And Receiving A Response
The real work begins when the ar.Connected
message is dispatched to the Client_Connected
handler. This is a session control message generated by Ansar as part of its networking obligations. A request is constructed using the contents of the self.settings
object and sent on its journey using;
a = self.create(ar.GetResponse, request, self.return_address)
Ansar assigns a special address to
self.return_address
such that it effectively refers to the object at the end of the new connection, in the remote process.
This line of code forms a fresh instance of the ar.GetResponse
object, as a child of the Client
object. This is an asynchronous object in the same manner as the client object. It is created, sends and receives messages and eventually completes. Arguments can be passed to the new object (e.g. request
and self.return_address
) and completion generates a message to the parent, i.e. ar.Completed
. That message contains a single output value.
The ar.GetResponse
object sends the request to the specified address. The next received message causes it to complete with the received message as its output value.
In Ansar, the self.send(message, address)
method is available to all asynchronous objects — the Client
object could have called this method directly but instead has delegated this simple action to a child object. The reasons for choosing the ar.GetResponse
approach will become clearer in later sections but most importantly the use of ar.GetResponse
creates a unique context for the interaction between the GetResponse
and the server. The Client_Connected
handler is guaranteed to be called exactly once, but in the more general case a handler may be called repeatedly, creating the potential for another upstream message to arrive before the previous request-response has had a chance to complete. Associating responses with their original requests becomes problematic.
The next line arranges for a callback;
def completed_1(response): # What to do when transaction completes.
self.complete(response) # Terminate with the response as the output.
a = self.create(ar.GetResponse, request, self.return_address)
self.assign(a, ar.OnCompleted(completed_1))
A function is defined and then passed on the construction of an ar.OnCompleted
object. That object is saved as the callback associated with the address of the ar.GetResponse
object. Execution of the callback occurs in the Client_Completed
handler;
def Client_Completed(self, message):
d = self.debrief(self.return_address)
if isinstance(d, ar.OnCompleted):
d(message.value)
The self.debrief
function does a lookup for the information saved by the self.assign
function. The information is verified and the callback executed. The OnCompleted.__call__
method accepts the result of the completed object (i.e. the ar.GetResponse
) and passes this to the saved function as the first parameter. Any additional parameters passed to the OnCompleted.__init__
method are also passed to the saved function.
The calling environment provided by the Client_Completed
handler is very close to the environment where the step_1
function was defined — the two handlers are effectively methods of the same object. This is significant to the successful execution of callbacks.
These details form the basis of truly asynchronous handling of request-response exchanges;
- receive an event, e.g. the network notification,
- create the
ar.GetResponse
object, - save a callback for when the object completes,
- execute the callback, passing the result of the
ar.GetResponse
.
For this initial foray into asynchronous networking, the callback simply terminates the Client
using self.complete
. Termination of the Client
also terminates the process. Passing the response
to the self.complete
function causes the printing of the calculated value.
Adding
--pure-object
to the command line causes output of raw JSON. Useful if there is post-processing to do.
Sequences Of Requests
Now consider the scenario where the client needs to make an initial request before the single request that was sent previously;
def Client_Connected(self, message):
settings = self.settings
request = settings.request(settings.x, settings.y)
server_address = self.return_address
def step_1(response, request, server_address):
a = self.create(ar.GetResponse, request, server_address)
self.assign(a, ar.OnCompleted(step_2))
def step_2(response):
self.complete(response)
a = self.create(ar.GetResponse, Divide(4.0, 2.0), server_address)
self.assign(a, ar.OnCompleted(step_1, request, server_address))
This new handler describes a sequence of request-response exchanges. A callback that creates a new ar.GetResponse
object (i.e. step_1
) is the glue that continues the sequence. Note the passing of additional parameters to the second ar.OnCompleted
. These parameters are forwarded when the callback executes and ensures that the proper values are used.
Implementation of the Client_Connected
handler may seem fragmented and understanding how the thread of execution passes through the different steps can take some getting used to. Its worth noting that the collection of code fragments contained within the definition of Client_Connected
comprise the implementation of a multi-phase, distributed transaction that is also fully asynchronous.
This style of implementation should be compared with other styles of implementation that achieve the same degree of sophistication, especially with respect to asynchronous operation. This refers to operationally equivalent implementations of Client_Connected
in languages other than Python, or in Python but without the use of Ansar. Following material will show that this style of implementation supports concurrency with zero additional effort on the part of the developer, and can also operate seamlessly across multiple supporting network services.
By adopting a few layout conventions and with the use of a supporting method, an asynchronous handler can look like this;
def Client_Connected(self, message):
# Define the multiple parts.
def begin(request, server_address):
a = self.create(ar.GetResponse, Divide(4.0, 2.0), server_address)
self.then(a, step_1, request, server_address)
def step_1(response, request, server_address):
a = self.create(ar.GetResponse, request, server_address)
self.then(a, step_2)
def step_2(response):
self.complete(response)
# Resolve variables.
settings = self.settings
request = settings.request(settings.x, settings.y)
server_address = self.return_address
# Start the transaction.
begin(request, server_address)
There is a sequence of local function definitions starting with begin
and ending with step_2
. The former captures the context of a unique asynchronous operation and the latter terminates the instance, by not calling self.then
. Calling then
is a shorthand for creation of the OnCompleted
object and passing it to assign
.
The function bodies are restricted to use of self
methods, and the parameters passed to self.then
. (e.g. pass message
or its members, to ensure the functions use the correct instance values). It is “thread safe” to access self
data members as all class handlers — including completions — execute in the same thread throughout the life of the object (e.g. the Client
). Take note that those data members may change between the calls to begin
, step_1
and step2
courtesy of other Client
handlers.
These are rather technical constraints on the code that can appear in the sequence of functions; coders that make use of local functions will understand the underlying reasons. Alternative strategies were explored but these were generally more heavy-handed, i.e. they required more setup and spread the functions beyond the definition of their “parent” handler. The Ansar library uses the SDL model for asynchronous operation which goes beyond the Python model (
async
andawait
). The SDL model is based around active objects and message passing.
Concurrent Requests
It is significant that no dependency exists between the first request and the second — the response from the first request is not used in creating the second request. This satisfies the basic requirements of concurrency;
def Client_Connected(self, message):
def begin(request, server_address):
a = self.create(ar.Concurrently,
(Divide(4.0, 2.0), server_address),
(request, server_address),
)
self.then(a, step_1)
def step_1(response):
self.complete(response[1])
settings = self.settings
request = settings.request(settings.x, settings.y)
server_address = self.return_address
begin(request, server_address)
The changes to implement concurrent operation are trivial. The two uses of ar.GetResponse
are replaced with a single use of ar.Concurrently
. The two request-response operations are expressed as two tuples on the parameter list and the response is a list of matching length. That list contains the pair of responses associated with the pair of requests. Internally, ar.Concurrently
passes the information from its parameter list on to instances of ar.GetResponse
.
The
ar.Sequentially
class has also been made available. It accepts the same parameters asar.Concurrently
but — as you would expect — performs the separate operations one at a time. This arrangement offers no opportunity for custom processing after each operation, implying that the entire sequence could have been performed concurrently. It is provided for scenarios such as the convenient comparison of concurrent vs sequential operation, i.e. verifying and quantifying the benefit of concurrency.
Sequences, Concurrency And Underlying Truths
The two techniques can be combined into sequences where each step is either an ar.GetResponse
or an ar.Concurrently
. Of course, to really demonstrate this capability requires the presence of multiple sources (i.e. network servers). Before tackling the connection to multiple servers there is an underlying technical issue that must be brought to light.
HTTP is a synchronous or blocking protocol and, courtesy of the parameters passed to the ar.connect
function, only HTTP request and response messages are being passed across the network transport. This is demonstrably the case given the use of the curl
command as the convenient testing utility for the crunch_server
. Ansar does a good job of hiding these kind of details; the application only ever sees objects such as Multiply
and Output
.
HTTP requests are discreetly serialized by Ansar (i.e. not the process of generating text representations of application data). It does not send a new request until the response to the previous request has been received. This is great in that it allows for asynchronous code (e.g. ar.Concurrently
) within the crunch_client
. It’s not so great in that it defeats the point of concurrency.
This situation highlights the reason that Ansar has its own native protocol that it uses for the transfer of messages across network transports. This protocol is bi-directional and fully asynchronous, meaning that anything can be sent at any time, by either party. There is no serialization of requests.
Dropping the relevant parameters (e.g. api_client
and ansar_server
) on the ar.connect
call reverts the transport to the native protocol. A matching change would be required in the crunch_server
. But rather than fiddling with source changes and test servers, this presentation continues with the exploration of other options.
HTTP capabilities were added to Ansar for integration purposes. As a general rule it will be enabled on public APIs and disabled on internal communications between two Ansar network components. Those public APIs inherit the limitations of HTTP (i.e. synchronous operation).
A last feature to be mentioned here is for even more advanced sequences and concurrency. The ar.Concurrently
object accepts a list of tuples or ar.CreateFrames
, the latter is a special class allowing for the inclusion of any asynchronous object. Demonstration of this feature is beyond the scope of this document. Suffice to say that this allows for extended interactions between parties rather than the simple request-response exchange offered by the tuple option. Adding a concurrent file transfer can take a single line of code.
When Things Go Wrong
Undesirable or unexpected conditions are detected by Ansar and generate error messages, e.g. application interrupt (i.e. control-c), timeout and unavailability of a service. These messages have their own associated class definitions such asar.Aborted
and ar.TimedOut
and all of these messages derive from the ar.Faulted
class. Adding error handling in Ansar is mostly about checking if responses are instances of faults and passing them on.
def Client_Connected(self, message):
def begin(request, server_address):
a = self.create(ar.Concurrently,
(Divide(4.0, 2.0), server_address),
(request, server_address),
seconds=3.0
)
self.then(a, step_1)
def step_1(response):
if isinstance(response, ar.Faulted):
self.complete(response)
self.complete(response[1])
settings = self.settings
request = settings.request(settings.x, settings.y)
server_address = self.return_address
begin(request, server_address)
This is also an opportunity to test for specific error conditions and add error-related actions. Note that passing a fault to self.complete
from the Client
object is setting the fault as the output of the process. Ansar gives special treatment to fault output, printing a diagnostic on stderr
and setting the exit status to a non-zero value.
Both ar.GetResponse
and ar.Concurrently
accept a number of seconds
within which the operation is expected to complete. This is the simplest approach to checking for a wide range of problems, such as network failures and remote server failures.
Network failures are notified and it is possible to propagate this knowledge through your pending requests. But as your networking becomes more advanced than this single-shot client, doing so can become detailed work. And those timeout checks will still needed be needed in production quality software.
Predefined Ansar faults include;
Aborted
, the process is cleaning up before terminatingTimedOut
, an operation has taken too longTemporarilyUnavailable
, an address is not currently connectedOverloaded
, a service is not performing to expectationsOutOfService
, a service has been disabledNotListening
, cannot establish a requested network listenNotConnected
, cannot establish a client connectionClosed
, a connection was terminated by the local processAbandoned
, a connection was terminated by the remote process
Application-specific faults should be defined as classes that derive from ar.Faulted
and registered using ar.bind
.
Working With Multiple Connections
Aside from switching to the native Ansar protocol, there are at least two alternative approaches to achieving concurrency within the constraints of HTTP. One is to open multiple connections to the server and the other is to open multiple connections to multiple instances of the server. From a coding point of view, these two approaches are almost indistinguishable. For ease of testing, this presentation chooses the former.
import ansar.connect as ar
from crunch_api import *
# The client object.
class Client(ar.Point, ar.Stateless):
def __init__(self, settings):
ar.Point.__init__(self)
ar.Stateless.__init__(self)
self.settings = settings
self.table = None
self.group = None
self.client_value = None
def Client_Start(self, message): # Start the networking.
host = self.settings.host
port = self.settings.port
ipp = ar.HostPort(host, port)
self.table = ar.GroupTable(
server_a=ar.CreateFrame(ar.ConnectToAddress, ipp, api_client='/', ansar_server=True),
server_b=ar.CreateFrame(ar.ConnectToAddress, ipp, api_client='/', ansar_server=True),
)
self.group = self.table.create(self, get_ready=8.0)
def Client_GroupUpdate(self, message):
self.table.update(message)
def Client_Ready(self, message):
def begin(request, server_a, server_b):
a = self.create(ar.Concurrently,
(Divide(4.0, 2.0), server_a),
(request, server_b),
)
self.then(a, step_1)
def step_1(response):
self.client_value = response[1]
self.send(ar.Stop(), self.group)
settings = self.settings
request = settings.request(settings.x, settings.y)
server_a = self.table.server_a
server_b = self.table.server_b
begin(request, server_a, server_b)
def Client_Completed(self, message):
d = self.debrief(self.return_address)
if isinstance(d, ar.OnCompleted):
d(message.value)
return
# Table has completed.
self.complete(self.client_value or message.value)
def Client_Stop(self, message): # Control-c or software interrupt.
self.client_value = ar.Aborted()
self.send(message, self.group)
# Declare the messages expected by the server object.
CLIENT_DISPATCH = [
ar.Start, # Initiate networking.
ar.GroupUpdate,
ar.Ready, # Members of group are ready.
ar.Completed,
ar.Stop, # Ctrl-c or programmed interrupt.
]
ar.bind(Client, CLIENT_DISPATCH)
# Configuration for this executable.
class Settings(object):
def __init__(self, host=None, port=None, request=None, x=None, y=None):
self.host = host
self.port = port
self.request = request
self.x = x
self.y = y
SETTINGS_SCHEMA = {
'host': str,
'port': int,
'request': ar.Type,
'x': float,
'y': float,
}
ar.bind(Settings, object_schema=SETTINGS_SCHEMA)
# Define default configuration and start the server.
factory_settings = Settings(host='127.0.0.1', port=5051, request=Multiply, x=1.5, y=2.25)
if __name__ == '__main__':
ar.create_object(Client, factory_settings=factory_settings)
Copy this code to the crunch_multi_client.py
file and execute the following command;
$ cd <crunch_server folder>
$ source .env/bin/activate
$ python3 crunch_multi_client.py --debug-level=DEBUG --x=10.0 --y=15.0
[00205791] 2024-09-19T13:28:32.162 + <00000010>SocketSelect - Created by <00000001>
[00205791] 2024-09-19T13:28:32.162 < <00000010>SocketSelect - Received Start from <00000001>
[00205791] 2024-09-19T13:28:32.162 > <00000010>SocketSelect - Sent SocketChannel to <00000001>
..
Output(value=150.0)
It takes a few moments of scanning the logs to verify that two requests are being delivered to the crunch_server
before there is any evidence of the first response making its return journey.
The ar.connect
call inside the Client_Start
handler has been replaced with the definition of a GroupTable
, and creation of a table manager using self.table.create
. That table manager works continuously behind the scenes with the goal of establishing the connections described in the GroupTable
. It sends progress updates to the client so that variables such as self.table.server_a
refer to the proper network connection.
At that moment when a full set of connections is in place, the manager also sends the Ready
message. This is an appropriate moment for the client to initiate its own activities. An instance of ar.Concurrently
is created and the familiar pair of tuples are passed.
The important difference to previous examples of ar.Concurrently
usage is that each request tuple pair gets its own instance of an HTTP session to send to. The two requests arrive at the crunch_server
at close to the same moment, side-stepping the constraint of a single, shared HTTP session and creating a context for true concurrency.
The issue of threading in the Python runtime is well known and real threading is needed before there can be true concurrency within a single Python process. Benefits can still be realized with the multiple connection approach but full concurrency requires multiple instances of the server.
This presentation has illustrated the ease with which multi-connection networking can be established. The same approach allows for networking that includes both connecting and listening. This style of application is referred to here as a component.
Creating A Fully Networked Backend Component
A component is a server that accepts clients and also connects to supporting network services. This is a common scenario in the backend of cloud services. The presented component will make multiple connections to the crunch_server
and accept requests over client connections. The curl
utility will again be used as the test client. All interactions between the client, the component and the supporting services will be asynchronous;
import ansar.connect as ar
from component_api import *
from crunch_api import *
COMPONENT_API = [
MulDiv,
DivMul,
]
# The Component object.
class Component(ar.Point, ar.Stateless):
def __init__(self, settings):
ar.Point.__init__(self)
ar.Stateless.__init__(self)
self.settings = settings
self.table = None
self.group = None
def Component_Start(self, message): # Start the networking.
crunch_ipp = self.settings.crunch
public_ipp = self.settings.public
self.table = ar.GroupTable(
server_a=ar.CreateFrame(ar.ConnectToAddress, crunch_ipp, api_client='/', ansar_server=True),
server_b=ar.CreateFrame(ar.ConnectToAddress, crunch_ipp, api_client='/', ansar_server=True),
public=ar.CreateFrame(ar.ListenAtAddress, public_ipp, api_server=COMPONENT_API),
)
self.group = self.table.create(self)
def Component_GroupUpdate(self, message):
self.table.update(message)
def Component_MulDiv(self, message):
f = ar.roll_call(self.table)
if f:
self.send(f, self.return_address)
return
def begin(mul, div, server_a, server_b, return_address):
a = self.create(ar.Concurrently,
(mul, server_a),
(div, server_b),
)
self.then(a, step_1, return_address)
def step_1(response, return_address):
value = response[0].value + response[1].value
self.send(Output(value), return_address)
mul = Multiply(message.a, message.b)
div = Divide(message.c, message.d)
server_a = self.table.server_a
server_b = self.table.server_b
return_address = self.return_address
begin(mul, div, server_a, server_b, return_address)
def Component_DivMul(self, message):
f = ar.roll_call(a=self.table.server_a, b=self.table.server_b)
if f:
self.send(f, self.return_address)
return
def begin(div, mul, server_a, server_b, return_address):
a = self.create(ar.Concurrently,
(div, server_a),
(mul, server_b),
)
self.then(a, step_1, return_address)
def step_1(response, return_address):
value = response[0].value + response[1].value
self.send(Output(value), return_address)
div = Divide(message.a, message.b)
mul = Multiply(message.c, message.d)
server_a = self.table.server_a
server_b = self.table.server_b
return_address = self.return_address
begin(div, mul, server_a, server_b, return_address)
def Component_Completed(self, message):
d = self.debrief(self.return_address)
if isinstance(d, ar.OnCompleted):
d(message.value)
return
# Table has terminated.
self.complete(ar.Aborted())
def Component_Stop(self, message): # Control-c or software interrupt.
self.send(message, self.group)
# Declare the messages expected by the server object.
COMPONENT_DISPATCH = [
ar.Start,
ar.GroupUpdate,
ar.Completed,
ar.Stop,
COMPONENT_API,
]
ar.bind(Component, COMPONENT_DISPATCH)
# Configuration for this executable.
class Settings(object):
def __init__(self, crunch=None, public=None):
self.crunch = crunch or ar.HostPort()
self.public = public or ar.HostPort()
SETTINGS_SCHEMA = {
'crunch': ar.UserDefined(ar.HostPort),
'public': ar.UserDefined(ar.HostPort),
}
ar.bind(Settings, object_schema=SETTINGS_SCHEMA)
# Define default configuration and start the server.
factory_settings = Settings(crunch=ar.HostPort('127.0.0.1', 5051),
public=ar.HostPort('127.0.0.1', 5052),
)
if __name__ == '__main__':
ar.create_object(Component, factory_settings=factory_settings)
Copy this code to the crunch_component.py
file. The component_api.py
file will also be needed. This describes an unlikely API that is useful only in that each request requires two underlying requests to the crunch_server
;
import ansar.encode as ar
from crunch_api import Output
__all__ = [
'MulDiv',
'DivMul',
]
# Declare the API.
class MulDiv(object):
def __init__(self, a=0.0, b=0.0, c=0.0, d=0.0):
self.a = a
self.b = b
self.c = c
self.d = d
class DivMul(object):
def __init__(self, a=0.0, b=0.0, c=0.0, d=0.0):
self.a = a
self.b = b
self.c = c
self.d = d
ar.bind(MulDiv)
ar.bind(DivMul)
Start the new component with the commands;
$ cd <crunch_server folder>
$ source .env/bin/activate
$ python3 crunch_component.py --debug-level=DEBUG
[00003855] 2024-10-06T01:35:23.069 + <00000012>SocketSelect - Created by <00000001>
[00003855] 2024-10-06T01:35:23.069 < <00000012>SocketSelect - Received Start from <00000001>
[00003855] 2024-10-06T01:35:23.069 > <00000012>SocketSelect - Sent SocketChannel to <00000001>
[00003855] 2024-10-06T01:35:23.069 + <00000013>PubSub[INITIAL] - Created by <00000001>
[00003855] 2024-10-06T01:35:23.070 < <00000013>PubSub[INITIAL] - Received Start from <00000001>
[00003855] 2024-10-06T01:35:23.070 + <00000014>object_vector - Created by <00000001>
[00003855] 2024-10-06T01:35:23.070 ~ <00000014>object_vector - Executable "/home/nigel/crunch_component.py" as object process (3855)
To test the arrangement of a component and its supporting service;
$ curl -s 'http://127.0.0.1:5052/MulDiv?a=9.0&b=7.0&c=370.0&d=10.0' | jq '.value[1].value'
100
There is a lot going on in that crunch_component
module. Firstly, there is the successful calculation of (9.0 * 7.0) + (370.0 / 10.0) using a concurrent pair of requests from the crunch_component
to the crunch_server
.
At least as importantly Ansar is providing automated session management. This happens when the crunch_component
is first started and connects to the crunch_server
, but it also happens if the crunch_server
were to be restarted for some reason. A control-c on the crunch_server
, a manual restart and a fresh curl
command will verify this automation. Note that it can take a few moments for the reconnection to happen.
This is the kind of behaviour required for production quality software. With the adoption of Ansar as the networking toolset, and the adherence to an asynchronous coding style, this difficult behaviour is achieved with almost no related effort on the part of the developer.
Session management in the Ansar library implements exponential backoff and randomization for the scheduling of connection retries. It also uses different time periods depending on whether the connection is within the localhost, across a LAN or over the Internet.
Attempting to curl
while the crunch_server
is still down (or the reconnection has not yet happened) will produce an error;
$ curl 'http://127.0.0.1:5052/DivMul?a=9.0&b=7.0&c=370.0&d=10.0'
{
"value": [
"ansar.create.lifecycle.TemporarilyUnavailable",
{
"condition": "required services \"a, b\" are temporarily unavailable",
"explanation": "try again later",
"unavailable": [
"a",
"b"
]
},
[]
]
}
The HTTP request reached the Component_DivMul
handler and then encountered ar.roll_call
. This function checks that a set of network addresses are currently connected. Failing that check results in the return of a TemporarilyUnavailable
fault that captures the details of the failure. That fault is delivered to the curl
command as an HTTP response and curl
prints the application/json
body.
Two styles of usage are accepted by roll_call
. The most concise is to pass the self.table
, which contains all the latest network address information courtesy of the Component_GroupUpdate
handler. The second style is to specify each address within self.table
that is significant to the request that is about occur, using the named parameter feature of Python.
The first style takes the least coding effort, but at runtime checks all the addresses including any irrelevant ones. The names of missing connections appearing in the fault are also the names of the members found in the self.table
. Using the second style requires a little more effort, performs only the necessary checks and gives control over the names that appear in the fault.
Adding a seconds
parameter to all the steps in a transaction completes a clean implementation of error handling, i.e for network errors and remote server failures. In the event a connection is lost, the corresponding member value will be set to None
and all subsequent ar.roll_call
s will fail. Any transactions that are already underway will eventually timeout.
In Ansar, sending a request to an address that is no longer valid — e.g. a lost connection — is harmless. The message effectively falls on the floor and no error is reported. Where a connection is recovered and there are still active transactions, those transactions will continue to timeout. Any attempts to communicate by those transactions will still be using the old, defunct address.
Summary
With less than 250 lines of source code this presentation has shown how to implement a small collection of processes, that perform advanced networking operations. These processes are suitable for production deployment, with robust error handling and session management.
The Ansar library supports significant features beyond what has been shown here. These include the ability to self-manage the assignment of network addresses (i.e. zero-conf networking) and more. For a solution to operational issues such as overloading, refer to Writing Python Servers That Know About Service Expectations.