Writing Python Servers That Know About Service Expectations
Overloading And Other Conditions
This is a presentation of a network component. It uses the component described in Python Networking On Steroids as a starting point. It implements features such as;
- API performance monitoring,
- overload detection and mitigation,
- live enable/disable, i.e. Out Of Service,
- live API tuning,
- persistent API configuration.
These are additional to the capabilities and properties inherited from the earlier work;
- fully asynchronous operation,
- multi-phase, distributed transactions,
- concurrency,
- automated connection recovery,
- code clarity
The server described in A Quality Python Server In 10 Minutes (i.e. the crunch_server
) is adopted as a test server for this component. Materials from both referenced articles are expected to be available in a folder. Commands appearing in this article will be executed in that working folder.
There is a single, main component module created in the following section. It references two interface modules (i.e. Python declaration of request and response messages) in the working folder. The component implements a network API and makes requests to the crunch_server
in support of that API. The curl
command is used to submit test requests to the API and to demonstrate the live management of that API.
Both referenced articles provide important background information. They cover the overall layout of the component module, how to manage multiple inbound and outbound connections and implementation of asynchronous distributed requests.
There is also information regarding deployment to production environments, such as execution as daemon processes, and the automated storage of logging for development and support purposes.
Less Than 200 Lines Of Python
The complete module is shown below. This can be copied to the monitoring_component.py
file, in the working folder;
import ansar.connect as ar
from component_api import *
from crunch_api import *
COMPONENT_API = [
MulDiv,
DivMul,
]
METERING_API = [
ar.ApiUpdate,
ar.ApiShow,
]
# The Component object.
class Component(ar.Point, ar.Stateless):
def __init__(self, settings):
ar.Point.__init__(self)
ar.Stateless.__init__(self)
self.settings = settings
self.table = None
self.group = None
self.metering = None
self.file = None
def Component_Start(self, message):
self.file = ar.File("api-metering", ar.UserDefined(ar.ApiMetering), create_default=True)
self.metering, _ = self.file.recover()
crunch_ipp = self.settings.crunch
public_ipp = self.settings.public
private_ipp = self.settings.private
self.table = ar.GroupTable(
server_a=ar.CreateFrame(ar.ConnectToAddress, crunch_ipp, api_client='/', ansar_server=True),
server_b=ar.CreateFrame(ar.ConnectToAddress, crunch_ipp, api_client='/', ansar_server=True),
public=ar.CreateFrame(ar.ListenAtAddress, public_ipp, api_server=COMPONENT_API),
private=ar.CreateFrame(ar.ListenAtAddress, private_ipp, api_server=METERING_API),
)
self.group = self.table.create(self)
def Component_GroupUpdate(self, message):
self.table.update(message)
def Component_Completed(self, message):
d = self.debrief(self.return_address)
if isinstance(d, ar.OnCompleted):
d(message.value)
return
# Table has terminated.
self.file.store(self.metering)
self.complete(ar.Aborted())
def Component_Stop(self, message): # Control-c or software interrupt.
self.send(message, self.group)
def Component_MulDiv(self, message):
f = self.metering.out_of_service(message)
if f:
self.send(f, self.return_address)
return
f = ar.roll_call(self.table)
if f:
self.send(f, self.return_address)
return
m = self.metering.start_meter(message)
if isinstance(m, ar.Faulted):
self.send(m, self.return_address)
return
def begin(mul, div, server_a, server_b, return_address, m):
a = self.create(ar.Concurrently,
(mul, server_a),
(div, server_b),
)
self.then(a, step_1, return_address, m)
def step_1(response, return_address, m):
value = response[0].value + response[1].value
self.send(Output(value), return_address)
self.metering.stop_meter(m, log=self)
mul = Multiply(message.a, message.b)
div = Divide(message.c, message.d)
server_a = self.table.server_a
server_b = self.table.server_b
return_address = self.return_address
begin(mul, div, server_a, server_b, return_address, m)
def Component_DivMul(self, message):
f = self.metering.out_of_service(message)
if f:
self.send(f, self.return_address)
return
f = ar.roll_call(a=self.table.server_a, b=self.table.server_b)
if f:
self.send(f, self.return_address)
return
m = self.metering.start_meter(message)
if isinstance(m, ar.Faulted):
self.send(m, self.return_address)
return
def begin(div, mul, server_a, server_b, return_address, m):
a = self.create(ar.Concurrently,
(div, server_a),
(mul, server_b),
)
self.then(a, step_1, return_address, m)
def step_1(response, return_address, m):
value = response[0].value + response[1].value
self.send(Output(value), return_address)
self.metering.stop_meter(m, log=self)
div = Divide(message.a, message.b)
mul = Multiply(message.c, message.d)
server_a = self.table.server_a
server_b = self.table.server_b
return_address = self.return_address
begin(div, mul, server_a, server_b, return_address, m)
def Component_ApiUpdate(self, message):
if self.metering.update(message):
self.file.store(self.metering)
self.reply(ar.Ack())
def Component_ApiShow(self, message):
self.reply(self.metering.report(message.request_type))
# Declare the messages expected by the component.
COMPONENT_DISPATCH = [
ar.Start,
ar.GroupUpdate,
ar.Completed,
ar.Stop,
COMPONENT_API,
METERING_API,
]
ar.bind(Component, COMPONENT_DISPATCH)
# Configuration for this executable.
class Settings(object):
def __init__(self, crunch=None, public=None, private=None):
self.crunch = crunch or ar.HostPort()
self.public = public or ar.HostPort()
self.private = private or ar.HostPort()
SETTINGS_SCHEMA = {
'crunch': ar.UserDefined(ar.HostPort),
'public': ar.UserDefined(ar.HostPort),
'private': ar.UserDefined(ar.HostPort),
}
ar.bind(Settings, object_schema=SETTINGS_SCHEMA)
# Define default configuration and start the server.
factory_settings = Settings(crunch=ar.HostPort('127.0.0.1', 5051),
public=ar.HostPort('0.0.0.0', 5052),
private=ar.HostPort('127.0.0.1', 5053),
)
if __name__ == '__main__':
ar.create_object(Component, factory_settings=factory_settings)
To run this module, use the following command;
$ cd <working folder>
$ source .env/bin/activate
$ python3 monitoring_component.py -dl=DEBUG
[01230467] 2024-10-28T13:40:46.766 + <00000013>SocketSelect - Created by <00000001>
[01230467] 2024-10-28T13:40:46.766 < <00000013>SocketSelect - Received Start from <00000001>
..
Note that the
crunch_server
from the previously referenced article is expected to be running at 127.0.0.1:5051. There is no significance to the order of startup and both thecrunch_server
andmonitoring_component
can be stopped and restarted at any time.
To verify operation of the component, enter the following command in a separate shell;
$ curl -s 'http://127.0.0.1:5052/MulDiv?a=1.0&b=2.0&c=3.0&d=4.0' | jq '.value[1].value'
2.75
The component accepts a connection from curl
, receives the MulDiv
message, submits Multiply
and Divide
requests to run concurrently in the crunch_server
, processes the responses and sends the final result back to curl
. The jq
command is used to extract the essential answer from the JSON body of the HTTP response message.
The intentions of the application are captured as the Component
class and a set of associated handlers;
Component_Start
— load configuration and start the networking,Component_GroupUpdate
— ongoing connects and disconnects,Component_Stop
— control-c termination,Component_MulDiv
— received a request,Component_ApiUpdate
— change the operational parameters,Component_ApiShow
— review the recent API performance.
This short and concise module delivers on a long list of claims. It manages to demonstrate the intractable (e.g. distributed concurrency) while also scoring well on complexity metrics such as levels of indentation and the length of functions.
System Performance And Service Level Expectations
An instance of ApiMetering
has been added as the self.metering
member in the Component
class. The constructor for this class looks like this;
class ApiMetering(ApiTuning):
def __init__(self, enabled=True, responsiveness=3.0, tunings=None, started=None, metering=None):
The parameters enabled
and responsiveness
refer to a “current availability” and “expected response time”, respectively. By default an API is expected to be available (i.e. enabled=True
) and responses to API requests are expected to be received within 3s (i.e. responsiveness=3.0
).
Performance statistics are gathered by marking the start of every request using;
m = self.metering.start_meter(message)
if isinstance(m, ar.Faulted):
self.send(m, self.return_address)
return
Completion of each request occurs after the response is sent;
self.send(Output(value), return_address)
self.metering.stop_meter(m, log=self)
Notice the unique identifier (i.e. m
) that is created by start_meter
and passed to stop_meter
. This ensures that the measured time span is for a particular request-response sequence. To view the results of recent metering activity use the following command;
$ curl 'http://127.0.0.1:5053/ApiShow?request_type=component_api.MulDiv'
{
"value": [
"ansar.connect.networking.RequestMetering",
{
"sample": [
{
"average": "0.002826s",
"measured": "0.002826s",
"started_at": "2022-07-29T14:14:40.685764"
}
],
"total": "0.002826s",
"water_mark": [
"2.25s",
"2.7s",
"3s"
]
},
[]
]
}
An ApiShow
message is sent to the component, listening on a separate port on the local host interface. This ensures that only those with login access to the machine can manage the API.
Load management occurs at the self.metering.start_meter
call site. Where recent statistics associated with the specified request hint at a heavily loaded system, an ar.Overloaded
fault may be returned.
Each start_meter
-stop_meter
pair produces an elapsed time that is recorded against the request type. A sliding window of the most recent times is retained (i.e. the sample
member in the JSON object above) and the average of those times used in determing the most appropriate action. This is either to accept or reject the next received request, i.e. by sending an ar.Overloaded
fault back to the client.
An API is considered to be overloaded once the derived average reaches 75% of the configured responsiveness
value, i.e. for the default of 3s this is 2.25s. There are 3 levels of overload at 75–90%, 90–100% and greater than 100% (refer to the water_mark
above). The rate of rejections for each level is 10%, 25% and 50% respectively, i.e. while the derived average remains over 3s, every second request will be rejected. Staging the percentage of rejections should moderate the overall behaviour of an API when it finds itself balancing user demands and system resources. Requests continue to be accepted even at the highest level as fresh readings are required to bring the observed average down.
A default instance of an ApiMetering
(i.e. no parameters specified) can be put to work immediately, as shown in the monitoring_component
. All metering for all request types will adopt the global tuning values — enabled=True
and responsiveness=3.0
. Specialized tuning is supported, either passed as an ApiTuning
in a tunings
parameter dict, or presented over a network connection as an ApiUpdate
message. Alternatively, it is also possible to deploy multiple ApiMetering
objects.
Access control occurs at a separate point in the code;
f = self.metering.out_of_service(message)
if f:
self.send(f, self.return_address)
return
If the ApiMetering.enabled
is set to False
, the ar.OutOfService
fault is returned to the client.
Live Updates And Persistence
With the monitoring_component
accepting component messages at 0.0.0.0:5052
and API control messages at 127.0.0.1:5053
;
$ curl -s 'http://127.0.0.1:5053/ApiUpdate?enabled=false'
{
"value": [
"ansar.create.lifecycle.Ack",
{},
[]
]
}
$ curl -s -w "\n\nCode: %{response_code}\n" 'http://127.0.0.1:5052/MulDiv?a=1.0&b=2.0&c=3.0&d=4.0'
{
"value": [
"ansar.create.lifecycle.OutOfService",
{
"condition": "API is out of service"
},
[]
]
}
Code: 500
Adding -w “\n\nCode: %{response_code}\n”
to the second curl
command causes the output of the HTTP response code. All faults are delivered to the client as HTTP response messages with this generic error code.
Detailed information about a service outage can be passed in the ApiUpdate
message;
$ curl -s 'http://127.0.0.1:5053/ApiUpdate?enabled=false&reason=regular%20maintenance&stand_down=2h
{
"value": [
"ansar.create.lifecycle.Ack",
{},
[]
]
}
This will produce the advisory API is out of service and resuming around 2022–07–30T10:50 UTC (regular maintenance)
. Resumption is calculated to occur at the current time plus the specified stand_down
period. An ApiUpdate
that omits the enabled
field but specifies the stand_down
period and/or the reason
can be sent at any time. The advisory will be updated accordingly.
During startup the monitoring_component
reads a configuration file in the current working folder. Where the file does not exist, the ar.File
object is directed to create the initial image, i.e. create_default=True
;
def Component_Start(self, message):
self.file = ar.File("api-metering", ar.UserDefined(ar.ApiMetering), create_default=True)
self.metering, _ = self.file.recover()
The file is updated on every change to the API configuration using the ApiUpdate
message, and on termination of the component;
def Component_ApiUpdate(self, message):
if self.metering.update(message):
self.file.store(self.metering)
self.reply(ar.Ack())
This arrangement ensures that the operational parameters for the API are preserved across separate executions of the component.
Summary
There are three operational conditions detected and managed;
TemporarilyUnavailable
— refer toar.roll_call
Overloaded
— refer toself.metering.start_meter
OutOfService
— refer toself.metering.out_of_service
The first is about the comings and goings of supporting services, e.g. the crunch_server
. In the world of Ansar components, any component may crash and recover at any time; the necessary connections will be automatically restored. In the meantime, requests dependent on an absent service will receive the TemporarilyUnavailable
fault.
Where a system appears to be dragging its heels, either due to a compromised supporting service or an overloaded local host, an API is able to report useful information to its clients. Rather than users experiencing aggravating silence, they will tend to receive an Overloaded
message in a timely fashion. That message gives some indication of where the problem might be. If the problem is due to a transient peak in system load, the dynamic, managed shedding of load gives the system an improved chance at recovery.
Where a system requires human intervention, the curl
command can be used to disable the entire API. Users will receive an OutOfService
message until the problem is resolved. The API is then re-enabled with a second curl
command.
The current state of the API is always available;
$ curl -s 'http://127.0.0.1:5053/ApiShow?'
{
"value": [
"ansar.connect.networking.ApiMetering",
{
"counter": 2,
"enabled": true,
"metering": [
[
"component_api.MulDiv",
{
"sample": [
{
"average": "0.003246s",
"measured": "0.004066s",
"started_at": "2022-08-28T20:07:27.788499"
},
{
"average": "0.003123s",
"measured": "0.002382s",
"started_at": "2022-08-28T20:07:36.418645"
}
],
"total": "0.6448s",
"water_mark": [
"2.25s",
"2.7s",
"3s"
]
}
]
],
"responsiveness": "3s",
"started": [],
"tunings": []
},
[]
]
}
The trailing question mark is the proper way to send an ApiShow
message as an HTTP request message, using the application/x-www-form-urlencoded
content type. The empty parameter list ensures the message contains a full set of default values.