Simulating a Chemical Reaction System with Time-Delayed Response

  1. Overview
  2. Running the Scripts
  3. Code Example 1: Simulating a Chemical Reaction System
  4. Code Example 2: PID Controller for Multiple Pumps and pH Probes

Overview

This script models a chemical reaction whose pH decreases with time (becomes more acidic) with a rate that also decreases with time (slowing kinetics). The system's pH level increases in response to the addition of a base at a specified rate (mL/min). The model is designed to emulate scenarios with extended time constants, which can present control challenges. Adjustments to the PID controllers are made to effectively manage this time-delayed response.

The intent of the simulated model is to reflect the time-dependent response of the chemical system to the addition of a base, particularly focusing on the system's time constant. The time constant is a critical parameter that characterizes how quickly the system reaches a new steady-state following a perturbation, such as the addition of a base. In this model, we implement the time constant to reflect the delayed impact of the base addition on the system's pH level. This is particularly useful for understanding control challenges associated with systems that have inherent time delays or slow response times. The time constant is integrated into the differential equations governing the rate of change in pH, allowing for a more realistic simulation. Detailed implementation of this concept is provided in the subsequent code sections.

Running the Scripts

To successfully run the simulation and control the system, two separate Python scripts must be executed in a sequential manner.

  1. Chemical Reaction Simulation (model.py): First, initiate the simulation by running the model.py script. Execute the following command in your terminal:

    python3 examples/pid/ph/model.py -r 0
    

    The -r 0 flag ensures that the script does not register as a recipe in the Aqueduct system.

  2. PID Control (pid.py): Once the simulation model is up and running, you can launch the PID control script in interactive mode. Use the following command:

    python3 -i examples/pid/pinch_valve/pid.py
    

    The -i flag allows you to interact with the script in real-time after it's been executed, permitting on-the-fly adjustments and queries.

Note: It is essential to start model.py before pid.py to ensure that the simulation environment is properly initialized for the PID control.

pid_controller_ph

Imports

Import standard Python libraries.

import time
import typing
import random
import threading

Reaction Class

The Reaction class simulates a chemical reaction characterized by varying parameters such as the time constant.

class Reaction:
    def __init__(self):
        """Initialize reaction parameters."""
        self.time_constant_s = round(random.uniform(2, 6), 3)
        # ...

Model Class

Manage multiple Reaction instances and connect them to physical devices like pumps and pH probes.

class Model:
    def __init__(self, pumps: typing.List[PeristalticPump], probe: PhProbe):
        """Initialize the model."""
        self.pumps = pumps
        self.ph_probe = probe
        # ...

    def calculate(self):
        """Calculate rate of pH change."""
        # ...

Here's the full code for the model:

"""
Description: This script models a chemical reaction system using threads for parallel calculations.
It simulates the delay in the response of pH with respect to the change in dose rate, thereby
modeling a time constant in the system.
"""
import argparse
import random
import threading
import time
import typing

from aqueduct.core.aq import Aqueduct
from aqueduct.core.aq import InitParams
from aqueduct.devices.base.utils import DeviceTypes
from aqueduct.devices.ph import PhProbe
from aqueduct.devices.pump.peristaltic import PeristalticPump


class Reaction:
    """
    Models a single chemical reaction by defining parameters to capture
    the rate of change of pH as a function of dose rate (mL/min).

    Attributes:
        reaction_start_time: Time when the reaction started.
        dpH_s_dmL_min_start: Initial slope of pH change per mL/min rate.
        delta_change_s: Change in slope of pH change per second.
        delta_change_bounds: Maximum and minimum rate of change.
        roc_offset: Initial offset of the rate of change.
        last_roc: Last calculated rate of change.
        time_constant_s: Time constant to model delay in pH response.
    """

    def __init__(self):
        """Initialize reaction parameters."""
        self.time_constant_s = round(random.uniform(2, 6), 3)
        self.roc_offset = round(random.uniform(-1.95 / 60, -0.95 / 60), 4)
        self.reaction_start_time: float = None

        self.dpH_s_dmL_min_start: float = 0.095  # (pH/s)/(mL/min)
        self.delta_change_s: float = 0.000005  # (pH/s)/(mL/min*s)
        self.delta_change_bounds: tuple = (-0.5, 0.5)

        self.last_roc = None

    def start_reaction(self) -> None:
        """Record the starting time of the reaction."""
        self.reaction_start_time = time.time()

    def calc_rate_of_change(self, ml_min):
        """
        Calculate the rate of change of pH with respect to the dose rate (mL/min).

        Args:
            ml_min: The dosing rate in mL/min.
        """
        # Calculate the time since the reaction started
        reaction_duration_s = time.time() - self.reaction_start_time

        # Calculate rate of change (roc)
        roc = (
            self.roc_offset
            + (self.dpH_s_dmL_min_start + reaction_duration_s * self.delta_change_s)
            * ml_min
        )

        # Constrain roc within bounds
        roc = max(min(roc, self.delta_change_bounds[1]), self.delta_change_bounds[0])
        roc = round(roc, 4)
        self.last_roc = roc


class Model:
    """
    Main class to manage multiple Reactions and link them with physical devices.

    Attributes:
        reactions: List of Reaction objects.
        delayed_roc: List of delayed rate of change values.
        pumps: List of PeristalticPump objects.
        ph_probe: PhProbe object.
        lock: Threading lock to control concurrent access to shared resources.
    """

    def __init__(self, pumps: typing.List[PeristalticPump], probe: PhProbe):
        """Initialize the model."""
        self.pumps = pumps
        self.ph_probe = probe
        self.reactions = [None] * self.ph_probe.len
        self.lock = threading.Lock()

        for i in range(self.ph_probe.len):
            self.reactions[i] = Reaction()
            self.reactions[i].start_reaction()

    def calculate(self):
        """
        Calculate the rate of change in pH for each reaction and
        update the pH probe simulation.
        """
        rates = [p.live[0].ml_min for p in self.pumps]

        def target(i, m):
            """Target function for threading."""
            time.sleep(m.time_constant_s)
            with self.lock:  # Acquire the lock before updating shared data
                m.calc_rate_of_change(rates[i])

        threads = []
        for (i, m) in enumerate(self.reactions):
            t = threading.Thread(target=target, args=(i, m), daemon=True)
            threads.append(t)
            t.start()

        with self.lock:  # Acquire the lock before updating shared data
            self.ph_probe.set_sim_rates_of_change([r.last_roc for r in self.reactions])


if __name__ == "__main__":

    # Parse the initialization parameters from the command line
    params = InitParams.parse()

    # Initialize the Aqueduct instance with the provided parameters
    aq = Aqueduct(
        params.user_id,
        params.ip_address,
        params.port,
        register_process=params.register_process,
    )

    # Perform system initialization if specified
    aq.initialize(params.init)

    # Set a delay between sending commands to the pump
    aq.set_command_delay(0.05)

    parser = argparse.ArgumentParser()

    parser.add_argument(
        "-c",
        "--clear",
        type=int,
        help="clear and create the setup (either 0 or 1)",
        default=1,
    )

    args, _ = parser.parse_known_args()
    clear = bool(args.clear)

    # Define names for devices
    PUMP0_NAME = "PUMP0"
    PUMP1_NAME = "PUMP1"
    PUMP2_NAME = "PUMP2"
    PH_PROBE_NAME = "PH_PROBE"

    if clear:
        # Clear the existing setup and add devices
        aq.clear_setup()

        aq.add_device(DeviceTypes.PERISTALTIC_PUMP, PUMP0_NAME, 1)
        aq.add_device(DeviceTypes.PERISTALTIC_PUMP, PUMP1_NAME, 1)
        aq.add_device(DeviceTypes.PERISTALTIC_PUMP, PUMP2_NAME, 1)
        aq.add_device(DeviceTypes.PH_PROBE, PH_PROBE_NAME, 3)

    # Retrieve the setup to confirm the added devices
    aq.get_setup()

    # Retrieve device instances
    pump0: PeristalticPump = aq.devices.get(PUMP0_NAME)
    pump1: PeristalticPump = aq.devices.get(PUMP1_NAME)
    pump2: PeristalticPump = aq.devices.get(PUMP2_NAME)
    ph_probe: PhProbe = aq.devices.get(PH_PROBE_NAME)

    ph_probe.set_sim_noise([0.0001, 0.0005, 0.0001])
    ph_probe.set_sim_rates_of_change([-0.1, -0.1, -0.1])

    # Create an instance of the PressureModel
    model = Model([pump0, pump1, pump2], ph_probe)

    # Continuous pressure calculation loop
    while True:
        model.calculate()
        time.sleep(0.5)

PID Control for Multiple Pumps and pH Probes

This script illustrates the setup of PID controllers to govern pump behavior based on pH measurements, enabling precise control of complex systems.

Initialization and Device Setup

Initialize the Aqueduct instance and retrieve device instances.

params = InitParams.parse()
aq = Aqueduct(params.user_id, params.ip_address, params.port)
# Retrieve device instances
pump0: PeristalticPump = aq.devices.get(PUMP0_NAME)

PID Controllers Setup

Configure PID controllers for each pump, using pH measurements as the process value.

controllers = []
for (i, pump) in enumerate([pump0, pump1, pump2]):
    process = ph_probe.to_pid_process_value(index=i)
    # ...
    p = Pid(8)
    p.add_schedule(sched)

Here's the full code for PID control:

"""
Demonstration of setting up a PID controller with Aqueduct devices.
"""
# Import necessary modules
from aqueduct.core.aq import Aqueduct
from aqueduct.core.aq import InitParams
from aqueduct.core.pid import Pid
from aqueduct.core.pid import Schedule
from aqueduct.core.pid import ScheduleConstraints
from aqueduct.core.pid import ScheduleParameters
from aqueduct.devices.ph import PhProbe
from aqueduct.devices.pump.peristaltic import PeristalticPump

# Parse the initialization parameters from the command line
params = InitParams.parse()

# Initialize the Aqueduct instance with the provided parameters
aq = Aqueduct(params.user_id, params.ip_address, params.port)

# Perform system initialization if specified
aq.initialize(params.init)

# Set a delay between sending commands to the pump
aq.set_command_delay(0.05)

# Define names for devices
PUMP0_NAME = "PUMP0"
PUMP1_NAME = "PUMP1"
PUMP2_NAME = "PUMP2"
PH_PROBE_NAME = "PH_PROBE"

# Retrieve the setup to confirm the added devices
aq.get_setup()

# Retrieve device instances
pump0: PeristalticPump = aq.devices.get(PUMP0_NAME)
pump1: PeristalticPump = aq.devices.get(PUMP1_NAME)
pump2: PeristalticPump = aq.devices.get(PUMP2_NAME)
ph_probe: PhProbe = aq.devices.get(PH_PROBE_NAME)

controllers = []

for (i, pump) in enumerate([pump0, pump1, pump2]):
    # Define PID controller parameters
    process = ph_probe.to_pid_process_value(index=i)
    control = pump.to_pid_control_output(index=0)
    p = Pid(8)

    # Define multiple schedules with different controller settings
    for integral_valid, dead_zone in [
        (0.3, 0.0005),
    ]:
        params = ScheduleParameters()
        params.kp = 1
        params.ki = 0.025
        params.kd = 0.55
        params.dead_zone = dead_zone
        params.integral_valid = integral_valid
        constraints = ScheduleConstraints()
        sched = Schedule(params, constraints)
        p.add_schedule(sched)

    # Set output limits for the PID controller
    p.output_limits = (0.0, 1)

    # Create a PID controller instance using Aqueduct
    pid = aq.pid_controller(f"pump{i}", process, control, p)
    controllers.append(pid)

These scripts leverage the Aqueduct library for device management and PID control, offering a comprehensive yet adaptable solution for controlling complex chemical systems.