So now you can run the main script. You can copy paste it here but make sure to weight calibration values from the previous step. (the values are under -------------------- Bottle calibration values ---------------------).
#!/usr/bin/env python3
"""
isslive_monitor.py
ISS urine tank monitor (OLED + console) + bottle scale control (HX711 + 2 pumps)
Changes requested:
- Left-edge artifact line on SH1106:
- Use an X crop + paste method to hide the first columns cleanly (avoids uneven 1–2px stripe).
- Adjust OLED_CROP_LEFT if needed (default 2).
- Pump overtime threshold: 18 seconds (was 60)
- Pump pin change:
- PUMP_FILL_PIN = 27 (GPIO27)
- PUMP_EMPTY_PIN remains GPIO11
- OLED content stays original (no bottle info on OLED), except overtime warning screen.
"""
from __future__ import annotations
import sys
import time
import threading
import traceback
from datetime import datetime, timezone
import RPi.GPIO as GPIO
from lightstreamer.client import LightstreamerClient, Subscription, SubscriptionListener
from luma.core.interface.serial import i2c
from luma.oled.device import sh1106
from PIL import Image, ImageDraw
# -------------------- CONFIG --------------------
# Lightstreamer
SERVER_URL = "https://push.lightstreamer.com"
ADAPTER_SET = "ISSLIVE"
ITEM_NAME = "NODE3000005"
FIELDS = ["TimeStamp", "Value"]
# OLED
I2C_PORT = 1
I2C_ADDR = 0x3C
OLED_ROTATE = 0
OLED_CONTRAST = 255
SCREEN_REFRESH_SEC = 1.0
# SH1106 left-edge artifact handling:
# Many SH1106 modules show an uneven bright stripe on the far-left.
# We draw normally, then crop-left by N pixels and paste back at x=0.
OLED_X_OFFSET = 0 # keep at 0 when using crop method
OLED_CROP_LEFT = 2 # hide 1–4 px; try 2 first, then 3 or 4 if needed
# HX711 (BCM pins)
HX711_DT_PIN = 5 # GPIO5 (Pin 29)
HX711_SCK_PIN = 6 # GPIO6 (Pin 31)
HX711_SAMPLES = 20
# Bottle calibration / control
BOTTLE_FULL_G = 675.0
DEADZONE_G = 50.0
# Pumps (BCM pins)
PUMP_FILL_PIN = 27 # GPIO27 (Physical 13) -> adds liquid
PUMP_EMPTY_PIN = 11 # GPIO11 (Physical 23) -> removes liquid
# MOSFET trigger modules are often "active HIGH" (signal HIGH = ON).
ACTIVE_HIGH = True
# Optional: safety minimum time between pump toggles (seconds)
MIN_SWITCH_INTERVAL_S = 0.5
# Pump overtime safety
PUMP_MAX_CONTINUOUS_SEC = 18.0 # <-- changed from 60 to 18 seconds
OVERTIME_RESET_OFF_SEC = 5.0 # pumps must be OFF this long to auto-clear latch
# ------------------------------------------------
def utc_now() -> datetime:
return datetime.now(timezone.utc)
def utc_str(dt: datetime | None = None) -> str:
if dt is None:
dt = utc_now()
return dt.strftime("%Y-%m-%d %H:%M:%S")
def clamp(x: float, lo: float, hi: float) -> float:
return lo if x < lo else hi if x > hi else x
# -------------------- Shared ISS State --------------------
class SharedState:
def __init__(self):
self.lock = threading.Lock()
self.value: float | None = None # ISS tank percent
self.iss_ts: str | None = None
self.last_rx_utc: datetime | None = None
def update(self, value: float, iss_ts: str | None):
with self.lock:
self.value = value
self.iss_ts = iss_ts
self.last_rx_utc = utc_now()
def snapshot(self):
with self.lock:
return self.value, self.iss_ts, self.last_rx_utc
STATE = SharedState()
# -------------------- OLED --------------------
def init_oled():
serial = i2c(port=I2C_PORT, address=I2C_ADDR)
device = sh1106(serial, rotate=OLED_ROTATE)
device.contrast(OLED_CONTRAST)
return device
def oled_render(device, lines: list[str]):
# Draw full buffer
img = Image.new("1", (device.width, device.height), 0)
draw = ImageDraw.Draw(img)
x = OLED_X_OFFSET
y = 0
for line in lines:
draw.text((x, y), line, fill=1)
y += 11
# Crop-left trick to hide uneven left stripe cleanly
if OLED_CROP_LEFT > 0:
cropped = img.crop((OLED_CROP_LEFT, 0, device.width, device.height))
img2 = Image.new("1", (device.width, device.height), 0)
img2.paste(cropped, (0, 0))
img = img2
device.display(img)
def build_lines(overtime: bool = False) -> list[str]:
if overtime:
return [
"ISS Urine Tank",
"",
"Pump overtime",
"",
f"UTC: {utc_str()}",
]
value, _iss_ts, last_rx = STATE.snapshot()
if value is None:
return [
"ISS Urine Tank",
"",
"Waiting for data",
"",
f"UTC {utc_str()}",
]
age = int((utc_now() - last_rx).total_seconds()) if last_rx else 0
return [
"ISS Urine Tank",
f"{value:.1f} %",
"",
f"Age: {age}s",
f"UTC: {utc_str()}",
]
# -------------------- HX711 (bit-banged) --------------------
class HX711:
def __init__(self, dt_pin: int, sck_pin: int):
self.dt = dt_pin
self.sck = sck_pin
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.sck, GPIO.OUT, initial=GPIO.LOW)
GPIO.setup(self.dt, GPIO.IN)
GPIO.output(self.sck, GPIO.LOW)
time.sleep(0.001)
def is_ready(self) -> bool:
return GPIO.input(self.dt) == 0
def read_raw(self, timeout_s: float = 1.0) -> int:
t0 = time.time()
while not self.is_ready():
if time.time() - t0 > timeout_s:
raise TimeoutError("HX711 not ready (DT stayed high). Check wiring/power.")
time.sleep(0.001)
value = 0
for _ in range(24):
GPIO.output(self.sck, GPIO.HIGH)
value = (value << 1) | (1 if GPIO.input(self.dt) else 0)
GPIO.output(self.sck, GPIO.LOW)
# 25th pulse => gain 128
GPIO.output(self.sck, GPIO.HIGH)
GPIO.output(self.sck, GPIO.LOW)
if value & 0x800000:
value -= 1 << 24
return value
def read_average(self, samples: int) -> int:
total = 0
for _ in range(samples):
total += self.read_raw()
return int(total / samples)
# -------------------- Bottle calibration values --------------------
HX711_OFFSET = -671229
HX711_SCALE = -1060.423246 # raw units per gram
def bottle_grams_from_raw(raw: int) -> float:
if HX711_SCALE == 0:
return 0.0
return (raw - HX711_OFFSET) / HX711_SCALE
def bottle_percent_from_grams(g: float) -> float:
return clamp((g / BOTTLE_FULL_G) * 100.0, 0.0, 100.0)
# -------------------- Pump control --------------------
class PumpController:
def __init__(self, fill_pin: int, empty_pin: int, active_high: bool = True):
self.fill_pin = fill_pin
self.empty_pin = empty_pin
self.active_high = active_high
self.lock = threading.Lock()
self.state = "OFF" # OFF / FILL / EMPTY
self.last_switch = 0.0
self._run_start_ts: float | None = None
self.overtime_latched: bool = False
self._off_since_ts: float | None = None
GPIO.setup(self.fill_pin, GPIO.OUT)
GPIO.setup(self.empty_pin, GPIO.OUT)
self._write(False, False)
def _write(self, fill: bool, empty: bool):
if self.active_high:
GPIO.output(self.fill_pin, GPIO.HIGH if fill else GPIO.LOW)
GPIO.output(self.empty_pin, GPIO.HIGH if empty else GPIO.LOW)
else:
GPIO.output(self.fill_pin, GPIO.LOW if fill else GPIO.HIGH)
GPIO.output(self.empty_pin, GPIO.LOW if empty else GPIO.HIGH)
def _trip_overtime(self, now: float):
self.overtime_latched = True
self._write(False, False)
self.state = "OFF"
self._off_since_ts = now
self._run_start_ts = None
def _update_overtime_state(self, now: float):
if self.state in ("FILL", "EMPTY"):
if self._run_start_ts is None:
self._run_start_ts = now
if (now - self._run_start_ts) >= PUMP_MAX_CONTINUOUS_SEC:
self._trip_overtime(now)
else:
self._run_start_ts = None
if self._off_since_ts is None:
self._off_since_ts = now
if self.overtime_latched and (now - self._off_since_ts) >= OVERTIME_RESET_OFF_SEC:
self.overtime_latched = False # auto-clear after cooldown
def set(self, mode: str):
mode = mode.upper()
if mode not in ("OFF", "FILL", "EMPTY"):
return
with self.lock:
now = time.time()
self._update_overtime_state(now)
if self.overtime_latched:
# Forced OFF while latched
self._write(False, False)
self.state = "OFF"
return
if mode != self.state and (now - self.last_switch) < MIN_SWITCH_INTERVAL_S:
return
if mode == "OFF":
self._write(False, False)
self._off_since_ts = now
self._run_start_ts = None
elif mode == "FILL":
self._write(True, False)
self._off_since_ts = None
self._run_start_ts = now
elif mode == "EMPTY":
self._write(False, True)
self._off_since_ts = None
self._run_start_ts = now
if mode != self.state:
self.state = mode
self.last_switch = now
def tick(self):
with self.lock:
self._update_overtime_state(time.time())
if self.overtime_latched:
self._write(False, False)
self.state = "OFF"
def get(self) -> str:
with self.lock:
return self.state
def is_overtime(self) -> bool:
with self.lock:
return self.overtime_latched
def shutdown(self):
with self.lock:
self._write(False, False)
self.state = "OFF"
self._run_start_ts = None
self._off_since_ts = time.time()
# -------------------- Lightstreamer listener --------------------
class ISSListener(SubscriptionListener):
def onItemUpdate(self, update):
val_str = update.getValue("Value")
iss_ts = update.getValue("TimeStamp")
if not val_str:
return
try:
value = float(val_str)
except ValueError:
print(f"[WARN] Non-numeric value: {val_str!r}", flush=True)
return
STATE.update(value, iss_ts)
print(f"[{utc_str()}] Urine Tank = {value:.1f}% (ISS TS: {iss_ts})", flush=True)
def onSubscriptionError(self, code, message):
print(f"[SUB ERROR] {code}: {message}", flush=True)
# -------------------- Main --------------------
def main() -> int:
# Ensure prints show immediately
try:
sys.stdout.reconfigure(line_buffering=True)
except Exception:
pass
print(f"[{utc_str()}] [BOOT] isslive_monitor starting...", flush=True)
# OLED init
try:
device = init_oled()
except Exception as e:
print(f"[ERROR] OLED init failed: {e}", file=sys.stderr, flush=True)
return 2
oled_render(device, ["ISS Urine Tank", "", "Connecting...", "", f"UTC {utc_str()}"])
# Init HX711 and pumps
try:
hx = HX711(HX711_DT_PIN, HX711_SCK_PIN)
except Exception as e:
print(f"[ERROR] HX711 init failed: {e}", file=sys.stderr, flush=True)
return 3
pumps = PumpController(PUMP_FILL_PIN, PUMP_EMPTY_PIN, active_high=ACTIVE_HIGH)
# Lightstreamer
client = LightstreamerClient(SERVER_URL, ADAPTER_SET)
client.connectionOptions.setRequestedMaxBandwidth("1.0")
sub = Subscription("MERGE", [ITEM_NAME], FIELDS)
sub.setRequestedSnapshot("yes")
sub.addListener(ISSListener())
print(f"[{utc_str()}] [INFO] Connecting to Lightstreamer...", flush=True)
client.connect()
client.subscribe(sub)
print(f"[{utc_str()}] [INFO] Subscribed. Waiting for updates...", flush=True)
last_overtime_print = 0.0
try:
while True:
pumps.tick()
overtime = pumps.is_overtime()
# OLED: show overtime message if active, else original screen
oled_render(device, build_lines(overtime=overtime))
if overtime:
now = time.time()
if now - last_overtime_print >= 1.0:
print(f"[{utc_str()}] Pump overtime", flush=True)
last_overtime_print = now
time.sleep(SCREEN_REFRESH_SEC)
continue
# Read bottle scale
try:
raw = hx.read_average(HX711_SAMPLES)
g = bottle_grams_from_raw(raw)
bottle_pct = bottle_percent_from_grams(g)
except Exception as e:
pumps.set("OFF")
print(f"[{utc_str()}] [WARN] HX711 read failed: {e} -> pumps OFF", flush=True)
time.sleep(SCREEN_REFRESH_SEC)
continue
# Determine target based on ISS %
iss_pct, _iss_ts, _last_rx = STATE.snapshot()
if iss_pct is None:
pumps.set("OFF")
print(
f"[{utc_str()}] Bottle={bottle_pct:.1f}% ({g:.1f} g) | ISS=NA -> pumps OFF",
flush=True,
)
time.sleep(SCREEN_REFRESH_SEC)
continue
iss_pct = clamp(iss_pct, 0.0, 100.0)
target_g = (iss_pct / 100.0) * BOTTLE_FULL_G
diff_g = g - target_g # + => bottle heavier than target
# Deadzone control
if diff_g > DEADZONE_G:
pumps.set("EMPTY")
elif diff_g < -DEADZONE_G:
pumps.set("FILL")
else:
pumps.set("OFF")
# Console output: bottle % required (not on OLED)
print(
f"[{utc_str()}] Bottle={bottle_pct:.1f}% ({g:.1f} g) "
f"| ISS={iss_pct:.1f}% (target {target_g:.1f} g) "
f"| diff={diff_g:+.1f} g | pump={pumps.get()}",
flush=True,
)
time.sleep(SCREEN_REFRESH_SEC)
except KeyboardInterrupt:
print("\n[INFO] Stopping...", flush=True)
finally:
pumps.shutdown()
try:
client.unsubscribe(sub)
client.disconnect()
except Exception:
pass
try:
GPIO.cleanup()
except Exception:
pass
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except SystemExit:
raise
except Exception:
print("[FATAL] Unhandled exception:", flush=True)
traceback.print_exc()
raise
Run the script and troubleshoot the issues.
The the last step is to set the py up so zhen it restarts it automatic start the script. I did not write down how i did these step but Google and even ChatGPT guided me to the solution.
Now you have a functional ISS Sewage monitor and you can hear when sombody takes a leak in the ISS!