Python Networking On Steroids

Scott Woods
18 min readOct 7, 2024

--

Asynchronicity, Concurrency And Clarity

Photo by Alina Grubnyak on Unsplash

This is a presentation of one way to write advanced networking software in Python. It tackles difficult topics such as asynchronicity and concurrency. At the same time, code clarity remains a primary motivation. There is a series of code modules intended as illustration of different points, with the final module offered as a starting point for your next ambitious networking project.

The first part of this presentation is maintained as a separate document that can be found here. This is because there is standalone value in presentation of a clear, concise HTTP API server that is unencumbered with the more advanced issues tackled here. Working through that presentation will introduce relevant background information on the adopted toolset (i.e. Ansar) and will also result in a running network server (i.e. the crunch_server). The latter is used as a test server in this presentation.

Ansar-based applications are supported on several distributions of Linux, macOS 14 and Windows 11. Process orchestration is also supported through the ansar CLI. Due to technical differences in these platforms, the latter is not supported on Windows

Familiarity with terms such as asynchronous, event-driven, FSM, RPC, dispatching, handlers, callbacks, concurrency and distributed transaction will help when reading this presentation.

Starting With A Plain Client

A good start is to look at a plain, HTTP request-response interaction with a server. Assuming the crunch_server is currently listening at the address 127.0.0.1:5051, copy the following code into crunch_client.py alongside the server file. This is for access to the definition module, crunch_api that appeared in the separate presentation;

import ansar.connect as ar

from crunch_api import *

# The client object.
class Client(ar.Point, ar.Stateless):
def __init__(self, settings):
ar.Point.__init__(self)
ar.Stateless.__init__(self)
self.settings = settings

def Client_Start(self, message): # Start the networking.
host = self.settings.host
port = self.settings.port
ipp = ar.HostPort(host, port)
ar.connect(self, ipp, api_client='/', ansar_server=True)

def Client_Connected(self, message):
settings = self.settings
request = settings.request(settings.x, settings.y)

def completed_1(response): # What to do when transaction completes.
self.complete(response) # Terminate with the response as the output.

a = self.create(ar.GetResponse, request, self.return_address)

self.assign(a, ar.OnCompleted(completed_1))

def Client_Completed(self, message):
d = self.debrief(self.return_address)
if isinstance(d, ar.OnCompleted):
d(message.value)

def Client_NotConnected(self, message): # No networking.
self.complete(message)

def Client_Stop(self, message): # Control-c or software interrupt.
self.abort(ar.Aborted())

# Declare the messages expected by the server object.
CLIENT_DISPATCH = [
ar.Start, # Initiate networking.
ar.Connected, # Ready to send.
ar.Completed,
ar.NotConnected, # Networking failed.
ar.Stop, # Ctrl-c or programmed interrupt.
]

ar.bind(Client, CLIENT_DISPATCH)

# Configuration for this executable.
class Settings(object):
def __init__(self, host=None, port=None, request=None, x=None, y=None):
self.host = host
self.port = port
self.request = request
self.x = x
self.y = y

SETTINGS_SCHEMA = {
'host': str,
'port': int,
'request': ar.Type,
'x': float,
'y': float,
}

ar.bind(Settings, object_schema=SETTINGS_SCHEMA)

# Define default configuration and start the server.
factory_settings = Settings(host='127.0.0.1', port=5051, request=Multiply, x=1.5, y=2.25)

if __name__ == '__main__':
ar.create_object(Client, factory_settings=factory_settings)

To run the client, use the following commands;

$ cd <crunch_server folder>
$ source .env/bin/activate
$ python3 crunch_client.py --debug-level=DEBUG --x=6.0 --y=7.0
Output(value=42.0)

Request values are accepted on the command line using the --member=value notation. The request is successfully submitted and the response printed in a human-readable form.

The crunch_client is structured in a fashion similar to the crunch_server. The technical reasons for this are covered in the description of the crunch_server. In brief this involves creation of a main, asynchronous object — in this case an instance of Client — using ar.create_object. The Ansar library then supplies a Start notification which the client uses to initiate a network connection. Ansar also supplies the subsequent ar.Connected notification which the client assumes is a good moment to start the request-response exchange.

Arguments passed to the ar.connect function arrange for specific handling of messages passing over the network connection;

  • api_client — enable use of the HTTP protocol and set the URI prefix,
  • ansar_server — enable the auto-decode of the HTTP response body.

Sending The Request And Receiving A Response

The real work begins when the ar.Connected message is dispatched to the Client_Connected handler. This is a session control message generated by Ansar as part of its networking obligations. A request is constructed using the contents of the self.settings object and sent on its journey using;

a = self.create(ar.GetResponse, request, self.return_address)

Ansar assigns a special address to self.return_address such that it effectively refers to the object at the end of the new connection, in the remote process.

This line of code forms a fresh instance of the ar.GetResponse object, as a child of the Client object. This is an asynchronous object in the same manner as the client object. It is created, sends and receives messages and eventually completes. Arguments can be passed to the new object (e.g. request and self.return_address) and completion generates a message to the parent, i.e. ar.Completed. That message contains a single output value.

The ar.GetResponse object sends the request to the specified address. The next received message causes it to complete with the received message as its output value.

In Ansar, the self.send(message, address) method is available to all asynchronous objects — the Client object could have called this method directly but instead has delegated this simple action to a child object. The reasons for choosing the ar.GetResponse approach will become clearer in later sections but most importantly the use of ar.GetResponse creates a unique context for the interaction between the GetResponse and the server. The Client_Connected handler is guaranteed to be called exactly once, but in the more general case a handler may be called repeatedly, creating the potential for another upstream message to arrive before the previous request-response has had a chance to complete. Associating responses with their original requests becomes problematic.

The next line arranges for a callback;

def completed_1(response):     # What to do when transaction completes.
self.complete(response) # Terminate with the response as the output.

a = self.create(ar.GetResponse, request, self.return_address)

self.assign(a, ar.OnCompleted(completed_1))

A function is defined and then passed on the construction of an ar.OnCompleted object. That object is saved as the callback associated with the address of the ar.GetResponse object. Execution of the callback occurs in the Client_Completed handler;

def Client_Completed(self, message):
d = self.debrief(self.return_address)
if isinstance(d, ar.OnCompleted):
d(message.value)

The self.debrief function does a lookup for the information saved by the self.assign function. The information is verified and the callback executed. The OnCompleted.__call__ method accepts the result of the completed object (i.e. the ar.GetResponse) and passes this to the saved function as the first parameter. Any additional parameters passed to the OnCompleted.__init__ method are also passed to the saved function.

The calling environment provided by the Client_Completed handler is very close to the environment where the step_1 function was defined — the two handlers are effectively methods of the same object. This is significant to the successful execution of callbacks.

These details form the basis of truly asynchronous handling of request-response exchanges;

  • receive an event, e.g. the network notification,
  • create the ar.GetResponse object,
  • save a callback for when the object completes,
  • execute the callback, passing the result of the ar.GetResponse.

For this initial foray into asynchronous networking, the callback simply terminates the Client using self.complete. Termination of the Client also terminates the process. Passing the response to the self.complete function causes the printing of the calculated value.

Adding--pure-object to the command line causes output of raw JSON. Useful if there is post-processing to do.

Sequences Of Requests

Now consider the scenario where the client needs to make an initial request before the single request that was sent previously;

def Client_Connected(self, message):
settings = self.settings
request = settings.request(settings.x, settings.y)
server_address = self.return_address

def step_1(response, request, server_address):
a = self.create(ar.GetResponse, request, server_address)
self.assign(a, ar.OnCompleted(step_2))

def step_2(response):
self.complete(response)

a = self.create(ar.GetResponse, Divide(4.0, 2.0), server_address)

self.assign(a, ar.OnCompleted(step_1, request, server_address))

This new handler describes a sequence of request-response exchanges. A callback that creates a new ar.GetResponse object (i.e. step_1) is the glue that continues the sequence. Note the passing of additional parameters to the second ar.OnCompleted. These parameters are forwarded when the callback executes and ensures that the proper values are used.

Implementation of the Client_Connected handler may seem fragmented and understanding how the thread of execution passes through the different steps can take some getting used to. Its worth noting that the collection of code fragments contained within the definition of Client_Connected comprise the implementation of a multi-phase, distributed transaction that is also fully asynchronous.

This style of implementation should be compared with other styles of implementation that achieve the same degree of sophistication, especially with respect to asynchronous operation. This refers to operationally equivalent implementations of Client_Connected in languages other than Python, or in Python but without the use of Ansar. Following material will show that this style of implementation supports concurrency with zero additional effort on the part of the developer, and can also operate seamlessly across multiple supporting network services.

By adopting a few layout conventions and with the use of a supporting method, an asynchronous handler can look like this;

def Client_Connected(self, message):
# Define the multiple parts.
def begin(request, server_address):
a = self.create(ar.GetResponse, Divide(4.0, 2.0), server_address)
self.then(a, step_1, request, server_address)

def step_1(response, request, server_address):
a = self.create(ar.GetResponse, request, server_address)
self.then(a, step_2)

def step_2(response):
self.complete(response)

# Resolve variables.
settings = self.settings
request = settings.request(settings.x, settings.y)
server_address = self.return_address

# Start the transaction.
begin(request, server_address)

There is a sequence of local function definitions starting with begin and ending with step_2. The former captures the context of a unique asynchronous operation and the latter terminates the instance, by not calling self.then. Calling then is a shorthand for creation of the OnCompleted object and passing it to assign.

The function bodies are restricted to use of self methods, and the parameters passed to self.then. (e.g. pass message or its members, to ensure the functions use the correct instance values). It is “thread safe” to access self data members as all class handlers — including completions — execute in the same thread throughout the life of the object (e.g. the Client). Take note that those data members may change between the calls to begin, step_1 and step2 courtesy of other Client handlers.

These are rather technical constraints on the code that can appear in the sequence of functions; coders that make use of local functions will understand the underlying reasons. Alternative strategies were explored but these were generally more heavy-handed, i.e. they required more setup and spread the functions beyond the definition of their “parent” handler. The Ansar library uses the SDL model for asynchronous operation which goes beyond the Python model (asyncand await). The SDL model is based around active objects and message passing.

Concurrent Requests

It is significant that no dependency exists between the first request and the second — the response from the first request is not used in creating the second request. This satisfies the basic requirements of concurrency;

def Client_Connected(self, message):
def begin(request, server_address):
a = self.create(ar.Concurrently,
(Divide(4.0, 2.0), server_address),
(request, server_address),
)
self.then(a, step_1)

def step_1(response):
self.complete(response[1])

settings = self.settings
request = settings.request(settings.x, settings.y)
server_address = self.return_address

begin(request, server_address)

The changes to implement concurrent operation are trivial. The two uses of ar.GetResponse are replaced with a single use of ar.Concurrently. The two request-response operations are expressed as two tuples on the parameter list and the response is a list of matching length. That list contains the pair of responses associated with the pair of requests. Internally, ar.Concurrently passes the information from its parameter list on to instances of ar.GetResponse.

The ar.Sequentially class has also been made available. It accepts the same parameters as ar.Concurrently but — as you would expect — performs the separate operations one at a time. This arrangement offers no opportunity for custom processing after each operation, implying that the entire sequence could have been performed concurrently. It is provided for scenarios such as the convenient comparison of concurrent vs sequential operation, i.e. verifying and quantifying the benefit of concurrency.

Sequences, Concurrency And Underlying Truths

The two techniques can be combined into sequences where each step is either an ar.GetResponse or an ar.Concurrently. Of course, to really demonstrate this capability requires the presence of multiple sources (i.e. network servers). Before tackling the connection to multiple servers there is an underlying technical issue that must be brought to light.

HTTP is a synchronous or blocking protocol and, courtesy of the parameters passed to the ar.connect function, only HTTP request and response messages are being passed across the network transport. This is demonstrably the case given the use of the curl command as the convenient testing utility for the crunch_server. Ansar does a good job of hiding these kind of details; the application only ever sees objects such as Multiply and Output.

HTTP requests are discreetly serialized by Ansar (i.e. not the process of generating text representations of application data). It does not send a new request until the response to the previous request has been received. This is great in that it allows for asynchronous code (e.g. ar.Concurrently) within the crunch_client. It’s not so great in that it defeats the point of concurrency.

This situation highlights the reason that Ansar has its own native protocol that it uses for the transfer of messages across network transports. This protocol is bi-directional and fully asynchronous, meaning that anything can be sent at any time, by either party. There is no serialization of requests.

Dropping the relevant parameters (e.g. api_client and ansar_server) on the ar.connect call reverts the transport to the native protocol. A matching change would be required in the crunch_server. But rather than fiddling with source changes and test servers, this presentation continues with the exploration of other options.

HTTP capabilities were added to Ansar for integration purposes. As a general rule it will be enabled on public APIs and disabled on internal communications between two Ansar network components. Those public APIs inherit the limitations of HTTP (i.e. synchronous operation).

A last feature to be mentioned here is for even more advanced sequences and concurrency. The ar.Concurrently object accepts a list of tuples or ar.CreateFrames, the latter is a special class allowing for the inclusion of any asynchronous object. Demonstration of this feature is beyond the scope of this document. Suffice to say that this allows for extended interactions between parties rather than the simple request-response exchange offered by the tuple option. Adding a concurrent file transfer can take a single line of code.

When Things Go Wrong

Undesirable or unexpected conditions are detected by Ansar and generate error messages, e.g. application interrupt (i.e. control-c), timeout and unavailability of a service. These messages have their own associated class definitions such asar.Aborted and ar.TimedOut and all of these messages derive from the ar.Faulted class. Adding error handling in Ansar is mostly about checking if responses are instances of faults and passing them on.

def Client_Connected(self, message):
def begin(request, server_address):
a = self.create(ar.Concurrently,
(Divide(4.0, 2.0), server_address),
(request, server_address),
seconds=3.0
)

self.then(a, step_1)

def step_1(response):
if isinstance(response, ar.Faulted):
self.complete(response)
self.complete(response[1])

settings = self.settings
request = settings.request(settings.x, settings.y)
server_address = self.return_address

begin(request, server_address)

This is also an opportunity to test for specific error conditions and add error-related actions. Note that passing a fault to self.complete from the Client object is setting the fault as the output of the process. Ansar gives special treatment to fault output, printing a diagnostic on stderr and setting the exit status to a non-zero value.

Both ar.GetResponse and ar.Concurrently accept a number of seconds within which the operation is expected to complete. This is the simplest approach to checking for a wide range of problems, such as network failures and remote server failures.

Network failures are notified and it is possible to propagate this knowledge through your pending requests. But as your networking becomes more advanced than this single-shot client, doing so can become detailed work. And those timeout checks will still needed be needed in production quality software.

Predefined Ansar faults include;

  • Aborted, the process is cleaning up before terminating
  • TimedOut, an operation has taken too long
  • TemporarilyUnavailable, an address is not currently connected
  • Overloaded, a service is not performing to expectations
  • OutOfService, a service has been disabled
  • NotListening, cannot establish a requested network listen
  • NotConnected, cannot establish a client connection
  • Closed, a connection was terminated by the local process
  • Abandoned, a connection was terminated by the remote process

Application-specific faults should be defined as classes that derive from ar.Faulted and registered using ar.bind.

Working With Multiple Connections

Aside from switching to the native Ansar protocol, there are at least two alternative approaches to achieving concurrency within the constraints of HTTP. One is to open multiple connections to the server and the other is to open multiple connections to multiple instances of the server. From a coding point of view, these two approaches are almost indistinguishable. For ease of testing, this presentation chooses the former.

import ansar.connect as ar

from crunch_api import *

# The client object.
class Client(ar.Point, ar.Stateless):
def __init__(self, settings):
ar.Point.__init__(self)
ar.Stateless.__init__(self)
self.settings = settings
self.table = None
self.group = None
self.client_value = None

def Client_Start(self, message): # Start the networking.
host = self.settings.host
port = self.settings.port
ipp = ar.HostPort(host, port)

self.table = ar.GroupTable(
server_a=ar.CreateFrame(ar.ConnectToAddress, ipp, api_client='/', ansar_server=True),
server_b=ar.CreateFrame(ar.ConnectToAddress, ipp, api_client='/', ansar_server=True),
)
self.group = self.table.create(self, get_ready=8.0)

def Client_GroupUpdate(self, message):
self.table.update(message)

def Client_Ready(self, message):
def begin(request, server_a, server_b):
a = self.create(ar.Concurrently,
(Divide(4.0, 2.0), server_a),
(request, server_b),
)
self.then(a, step_1)

def step_1(response):
self.client_value = response[1]
self.send(ar.Stop(), self.group)

settings = self.settings
request = settings.request(settings.x, settings.y)
server_a = self.table.server_a
server_b = self.table.server_b

begin(request, server_a, server_b)

def Client_Completed(self, message):
d = self.debrief(self.return_address)
if isinstance(d, ar.OnCompleted):
d(message.value)
return

# Table has completed.
self.complete(self.client_value or message.value)

def Client_Stop(self, message): # Control-c or software interrupt.
self.client_value = ar.Aborted()
self.send(message, self.group)

# Declare the messages expected by the server object.
CLIENT_DISPATCH = [
ar.Start, # Initiate networking.
ar.GroupUpdate,
ar.Ready, # Members of group are ready.
ar.Completed,
ar.Stop, # Ctrl-c or programmed interrupt.
]

ar.bind(Client, CLIENT_DISPATCH)

# Configuration for this executable.
class Settings(object):
def __init__(self, host=None, port=None, request=None, x=None, y=None):
self.host = host
self.port = port
self.request = request
self.x = x
self.y = y

SETTINGS_SCHEMA = {
'host': str,
'port': int,
'request': ar.Type,
'x': float,
'y': float,
}

ar.bind(Settings, object_schema=SETTINGS_SCHEMA)

# Define default configuration and start the server.
factory_settings = Settings(host='127.0.0.1', port=5051, request=Multiply, x=1.5, y=2.25)

if __name__ == '__main__':
ar.create_object(Client, factory_settings=factory_settings)

Copy this code to the crunch_multi_client.py file and execute the following command;

$ cd <crunch_server folder>
$ source .env/bin/activate
$ python3 crunch_multi_client.py --debug-level=DEBUG --x=10.0 --y=15.0
[00205791] 2024-09-19T13:28:32.162 + <00000010>SocketSelect - Created by <00000001>
[00205791] 2024-09-19T13:28:32.162 < <00000010>SocketSelect - Received Start from <00000001>
[00205791] 2024-09-19T13:28:32.162 > <00000010>SocketSelect - Sent SocketChannel to <00000001>
..
Output(value=150.0)

It takes a few moments of scanning the logs to verify that two requests are being delivered to the crunch_server before there is any evidence of the first response making its return journey.

The ar.connect call inside the Client_Start handler has been replaced with the definition of a GroupTable, and creation of a table manager using self.table.create. That table manager works continuously behind the scenes with the goal of establishing the connections described in the GroupTable. It sends progress updates to the client so that variables such as self.table.server_a refer to the proper network connection.

At that moment when a full set of connections is in place, the manager also sends the Ready message. This is an appropriate moment for the client to initiate its own activities. An instance of ar.Concurrently is created and the familiar pair of tuples are passed.

The important difference to previous examples of ar.Concurrently usage is that each request tuple pair gets its own instance of an HTTP session to send to. The two requests arrive at the crunch_server at close to the same moment, side-stepping the constraint of a single, shared HTTP session and creating a context for true concurrency.

The issue of threading in the Python runtime is well known and real threading is needed before there can be true concurrency within a single Python process. Benefits can still be realized with the multiple connection approach but full concurrency requires multiple instances of the server.

This presentation has illustrated the ease with which multi-connection networking can be established. The same approach allows for networking that includes both connecting and listening. This style of application is referred to here as a component.

Creating A Fully Networked Backend Component

A component is a server that accepts clients and also connects to supporting network services. This is a common scenario in the backend of cloud services. The presented component will make multiple connections to the crunch_server and accept requests over client connections. The curl utility will again be used as the test client. All interactions between the client, the component and the supporting services will be asynchronous;

import ansar.connect as ar

from component_api import *
from crunch_api import *

COMPONENT_API = [
MulDiv,
DivMul,
]

# The Component object.
class Component(ar.Point, ar.Stateless):
def __init__(self, settings):
ar.Point.__init__(self)
ar.Stateless.__init__(self)
self.settings = settings
self.table = None
self.group = None

def Component_Start(self, message): # Start the networking.
crunch_ipp = self.settings.crunch
public_ipp = self.settings.public

self.table = ar.GroupTable(
server_a=ar.CreateFrame(ar.ConnectToAddress, crunch_ipp, api_client='/', ansar_server=True),
server_b=ar.CreateFrame(ar.ConnectToAddress, crunch_ipp, api_client='/', ansar_server=True),
public=ar.CreateFrame(ar.ListenAtAddress, public_ipp, api_server=COMPONENT_API),
)
self.group = self.table.create(self)

def Component_GroupUpdate(self, message):
self.table.update(message)

def Component_MulDiv(self, message):
f = ar.roll_call(self.table)
if f:
self.send(f, self.return_address)
return

def begin(mul, div, server_a, server_b, return_address):
a = self.create(ar.Concurrently,
(mul, server_a),
(div, server_b),
)

self.then(a, step_1, return_address)

def step_1(response, return_address):
value = response[0].value + response[1].value
self.send(Output(value), return_address)

mul = Multiply(message.a, message.b)
div = Divide(message.c, message.d)
server_a = self.table.server_a
server_b = self.table.server_b
return_address = self.return_address

begin(mul, div, server_a, server_b, return_address)

def Component_DivMul(self, message):
f = ar.roll_call(a=self.table.server_a, b=self.table.server_b)
if f:
self.send(f, self.return_address)
return

def begin(div, mul, server_a, server_b, return_address):
a = self.create(ar.Concurrently,
(div, server_a),
(mul, server_b),
)

self.then(a, step_1, return_address)

def step_1(response, return_address):
value = response[0].value + response[1].value
self.send(Output(value), return_address)

div = Divide(message.a, message.b)
mul = Multiply(message.c, message.d)

server_a = self.table.server_a
server_b = self.table.server_b
return_address = self.return_address

begin(div, mul, server_a, server_b, return_address)

def Component_Completed(self, message):
d = self.debrief(self.return_address)
if isinstance(d, ar.OnCompleted):
d(message.value)
return

# Table has terminated.
self.complete(ar.Aborted())

def Component_Stop(self, message): # Control-c or software interrupt.
self.send(message, self.group)

# Declare the messages expected by the server object.
COMPONENT_DISPATCH = [
ar.Start,
ar.GroupUpdate,
ar.Completed,
ar.Stop,
COMPONENT_API,
]

ar.bind(Component, COMPONENT_DISPATCH)

# Configuration for this executable.
class Settings(object):
def __init__(self, crunch=None, public=None):
self.crunch = crunch or ar.HostPort()
self.public = public or ar.HostPort()

SETTINGS_SCHEMA = {
'crunch': ar.UserDefined(ar.HostPort),
'public': ar.UserDefined(ar.HostPort),
}

ar.bind(Settings, object_schema=SETTINGS_SCHEMA)

# Define default configuration and start the server.
factory_settings = Settings(crunch=ar.HostPort('127.0.0.1', 5051),
public=ar.HostPort('127.0.0.1', 5052),
)

if __name__ == '__main__':
ar.create_object(Component, factory_settings=factory_settings)

Copy this code to the crunch_component.py file. The component_api.py file will also be needed. This describes an unlikely API that is useful only in that each request requires two underlying requests to the crunch_server;

import ansar.encode as ar

from crunch_api import Output

__all__ = [
'MulDiv',
'DivMul',
]

# Declare the API.
class MulDiv(object):
def __init__(self, a=0.0, b=0.0, c=0.0, d=0.0):
self.a = a
self.b = b
self.c = c
self.d = d

class DivMul(object):
def __init__(self, a=0.0, b=0.0, c=0.0, d=0.0):
self.a = a
self.b = b
self.c = c
self.d = d

ar.bind(MulDiv)
ar.bind(DivMul)

Start the new component with the commands;

$ cd <crunch_server folder>
$ source .env/bin/activate
$ python3 crunch_component.py --debug-level=DEBUG
[00003855] 2024-10-06T01:35:23.069 + <00000012>SocketSelect - Created by <00000001>
[00003855] 2024-10-06T01:35:23.069 < <00000012>SocketSelect - Received Start from <00000001>
[00003855] 2024-10-06T01:35:23.069 > <00000012>SocketSelect - Sent SocketChannel to <00000001>
[00003855] 2024-10-06T01:35:23.069 + <00000013>PubSub[INITIAL] - Created by <00000001>
[00003855] 2024-10-06T01:35:23.070 < <00000013>PubSub[INITIAL] - Received Start from <00000001>
[00003855] 2024-10-06T01:35:23.070 + <00000014>object_vector - Created by <00000001>
[00003855] 2024-10-06T01:35:23.070 ~ <00000014>object_vector - Executable "/home/nigel/crunch_component.py" as object process (3855)

To test the arrangement of a component and its supporting service;

$ curl -s 'http://127.0.0.1:5052/MulDiv?a=9.0&b=7.0&c=370.0&d=10.0' | jq '.value[1].value'
100

There is a lot going on in that crunch_component module. Firstly, there is the successful calculation of (9.0 * 7.0) + (370.0 / 10.0) using a concurrent pair of requests from the crunch_component to the crunch_server.

At least as importantly Ansar is providing automated session management. This happens when the crunch_component is first started and connects to the crunch_server, but it also happens if the crunch_server were to be restarted for some reason. A control-c on the crunch_server, a manual restart and a fresh curl command will verify this automation. Note that it can take a few moments for the reconnection to happen.

This is the kind of behaviour required for production quality software. With the adoption of Ansar as the networking toolset, and the adherence to an asynchronous coding style, this difficult behaviour is achieved with almost no related effort on the part of the developer.

Session management in the Ansar library implements exponential backoff and randomization for the scheduling of connection retries. It also uses different time periods depending on whether the connection is within the localhost, across a LAN or over the Internet.

Attempting to curl while the crunch_server is still down (or the reconnection has not yet happened) will produce an error;

$ curl 'http://127.0.0.1:5052/DivMul?a=9.0&b=7.0&c=370.0&d=10.0'
{
"value": [
"ansar.create.lifecycle.TemporarilyUnavailable",
{
"condition": "required services \"a, b\" are temporarily unavailable",
"explanation": "try again later",
"unavailable": [
"a",
"b"
]
},
[]
]
}

The HTTP request reached the Component_DivMul handler and then encountered ar.roll_call. This function checks that a set of network addresses are currently connected. Failing that check results in the return of a TemporarilyUnavailable fault that captures the details of the failure. That fault is delivered to the curl command as an HTTP response and curl prints the application/json body.

Two styles of usage are accepted by roll_call. The most concise is to pass the self.table, which contains all the latest network address information courtesy of the Component_GroupUpdate handler. The second style is to specify each address within self.table that is significant to the request that is about occur, using the named parameter feature of Python.

The first style takes the least coding effort, but at runtime checks all the addresses including any irrelevant ones. The names of missing connections appearing in the fault are also the names of the members found in the self.table. Using the second style requires a little more effort, performs only the necessary checks and gives control over the names that appear in the fault.

Adding a seconds parameter to all the steps in a transaction completes a clean implementation of error handling, i.e for network errors and remote server failures. In the event a connection is lost, the corresponding member value will be set to None and all subsequent ar.roll_calls will fail. Any transactions that are already underway will eventually timeout.

In Ansar, sending a request to an address that is no longer valid — e.g. a lost connection — is harmless. The message effectively falls on the floor and no error is reported. Where a connection is recovered and there are still active transactions, those transactions will continue to timeout. Any attempts to communicate by those transactions will still be using the old, defunct address.

Summary

With less than 250 lines of source code this presentation has shown how to implement a small collection of processes, that perform advanced networking operations. These processes are suitable for production deployment, with robust error handling and session management.

The Ansar library supports significant features beyond what has been shown here. These include the ability to self-manage the assignment of network addresses (i.e. zero-conf networking) and more. For a solution to operational issues such as overloading, refer to Writing Python Servers That Know About Service Expectations.

--

--

Scott Woods
Scott Woods

Written by Scott Woods

Tried a few things and one of them happened to be coding. Still trying after 30-plus years. So much to do. Bikes and beaches when I cant look at a screen.

No responses yet