Programm erstellung Erdbebensensor (ehem. QCN)

Für eigene Projekte, Ideen und Spezielles
Antworten
Benutzeravatar
gargamel
Offline
Beiträge: 379
Registriert: 07 Dez 2020, 16:48
Wohnort: Nettetal-Kaldenkirchen
Hat sich bedankt: 3 mal
Danksagung erhalten: 16 mal
Kontaktdaten:

Programm erstellung Erdbebensensor (ehem. QCN)

#1

Beitrag von gargamel »

Hallo zusammen,
wer von Euch könnte ein PHP Programm schreiben,
das den JoyWarrior 24F14USB Sensor auslesen kann und die möglichkeit hat dieses Ergebnis mit einer Grafik (Erdbebenskala) mittels FTP hoch zu laden.
Oder alternativ das alte QCNlive mit einer FTP-Erweiterung mit Grafik zu ergänzen.
Wäre schön wenn sich einer damit auskennt.
Zuletzt geändert von gargamel am 08 Jan 2026, 23:26, insgesamt 2-mal geändert.
LG
Wilfried

Mitglied beim www.ruhrwetter.de und www.wettersektor.de
Wetterstation: Nettetal-Kaldenkirchen www.wilfried-monika.de

Davis VantagePro2 Active Plus, Wetterdirekt Sirius 300 und Sturmglas, TOA-Blitzortung, Radioaktivmessung
Benutzeravatar
olicat
Offline
Beiträge: 2572
Registriert: 07 Dez 2020, 20:33
Wohnort: Hohen Neuendorf
Hat sich bedankt: 46 mal
Danksagung erhalten: 523 mal
Kontaktdaten:

Re: Programm erstellung Erdbebensensor (ehem. QCN)

#2

Beitrag von olicat »

Hi!

Also ChatGPT behauptet, das (beinahe) zu koennen.
Mein Prompt dazu war:
Schreibe ein PHP Programm, das den JoyWarrior 24F14USB Sensor auslesen kann und die Möglichkeit hat dieses Ergebnis mit einer Grafik (Erdbebenskala) mittels FTP hoch zu laden.
Das Ergebnis sieht vielversprechend aus:
Kurzfassung vorweg: Ein reines PHP-Programm kann den JoyWarrior 24F14USB nicht direkt auslesen, weil PHP keinen nativen Zugriff auf USB/HID-Geräte hat.
Machbar ist es aber sehr gut über eine kleine Hilfsschicht (z. B. Python oder C), die die Sensordaten liest, und PHP übernimmt dann Grafik + FTP-Upload.
...
JoyWarrior Sensor

Python / C Programm (liest USB)

JSON / TXT Datei oder STDOUT

PHP
├─ Berechnung (Magnitude / Intensität)
├─ Grafik (Erdbebenskala als PNG)
└─ FTP Upload
...
Auch fuer die Grafikerstellung und den FTP-Upload gab es sinnvoll aussehende Hinweise.
Versuch doch da mal Dein Glueck?!

Oliverr
Benutzeravatar
gargamel
Offline
Beiträge: 379
Registriert: 07 Dez 2020, 16:48
Wohnort: Nettetal-Kaldenkirchen
Hat sich bedankt: 3 mal
Danksagung erhalten: 16 mal
Kontaktdaten:

Re: Programm erstellung Erdbebensensor (ehem. QCN)

#3

Beitrag von gargamel »

Hallo Ollicat,
ich bin leider nicht der "PHP" Spezialist gar noch mit "Pyhton"
So wie ich es verstehe, müssen dann 2 Betriebssysteme laufen: Windows und Pyhton zumindest für die Programmerstellung.
Ich weiss nicht, ist denn sowas denn viel Arbeit (für dich :oops: ) sowas zu erstellen?
LG
Wilfried

Mitglied beim www.ruhrwetter.de und www.wettersektor.de
Wetterstation: Nettetal-Kaldenkirchen www.wilfried-monika.de

Davis VantagePro2 Active Plus, Wetterdirekt Sirius 300 und Sturmglas, TOA-Blitzortung, Radioaktivmessung
Benutzeravatar
olicat
Offline
Beiträge: 2572
Registriert: 07 Dez 2020, 20:33
Wohnort: Hohen Neuendorf
Hat sich bedankt: 46 mal
Danksagung erhalten: 523 mal
Kontaktdaten:

Re: Programm erstellung Erdbebensensor (ehem. QCN)

#4

Beitrag von olicat »

Hi!

Vermutlich willst Du das Programm unter Windows laufen lassen?
Dann koennte man (wohl) einfach eine EXE aus dem Python-Code generieren.

Mir persoenlich fehlen die grundlegenden Dinge fuer ein derartiges Programmierprojekt:
- Sensor
- Zeit
- Motivation

Wobei die Aufgabenstellung sehr einfach klingt - daher die aufgezeigte Loesung ueber eine automatische KI-Erstellung.
Ich habe die Aufgabenstellung heute auch nochmal den M365 Copilot vor die Fuesse geworfen.
Auch der hat ein auf den ersten Blick sinnvolles Programm erzeugt.
Hier der Prompt:
Schreibe ein Python-Programm, das den JoyWarrior 24F14USB Sensor über HID auslesen kann und die Möglichkeit hat, dieses Ergebnis mit einer Grafik (Erdbebenskala) lokal abzulegen.
Dabei soll sekündlich eine Messung vorgenommen werden und eine Grafik mit dem Verlauf der letzten Minute, der letzten Stunde und der letzten 24 Stunden erzeugt werden.
Optional sollen diese erzeugten Dateien mittels FTP an ein benutzerdefiniertes Ziel hochgeladen werden.
Dieses Python Programm soll möglichst nur Standard-Bibliotheken nutzen und sowohl unter Linux als auch unter Windows laufen.
Ergebnis:

Code: Alles auswählen


#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
JoyWarrior 24F14USB – HID-Auslesen (sekündlich), Zeitverlauf-Grafiken (Minute/Stunde/24h) als SVG,
optional FTP-Upload (ftplib, unverschlüsselt). Nur Standardbibliotheken.

Linux:   /dev/hidraw* (hidraw: Roh-Reports)  -> docs.kernel.org/hid/hidraw.html
Windows: ctypes + HID/SetupAPI (HidD_GetHidGuid, HidD_GetInputReport, SetupDi*) -> Microsoft Learn

ACHTUNG: Die "Erdbebenskala" ist schematisch; JoyWarrior ist kein Seismometer.
"""

import os
import sys
import time
import math
import glob
import ftplib
import ctypes
import ctypes.wintypes as wintypes
import datetime as dt
import collections
from typing import Optional, Tuple, Deque, List

# ----------------------------
# Benutzerkonfiguration
# ----------------------------

CONFIG = {
    # Erkennung: Entweder VID/PID setzen (hex) ODER Name-Hints (Produktstring).
    "VENDOR_ID": None,             # z.B. 0x16C0 (falls bekannt)
    "PRODUCT_ID": None,            # z.B. 0x27DB (falls bekannt)
    "PRODUCT_NAME_HINTS": ["joywarrior", "code mercenaries"],

    # HID-Report-Layout (RAW-Reports: 2 Byte/achse, little-endian; anpassen falls abweichend)
    "AXIS_OFFSETS": (0, 2, 4),     # Start-Byte X/Y/Z im Report
    "REPORT_LENGTH": 8,            # typ. 8..64 Bytes; anpassen, falls nötig

    # Bereich ±g passend zur Gerätekonfiguration (±1, ±1.5, ±2, ±3, ±4, ±8, ±16)
    "SENSOR_RANGE_G": 2.0,

    # Mess-/Buffer-Parameter
    "SAMPLE_INTERVAL_S": 1.0,      # 1 Hz
    "BUFFER_SECONDS": 86400,       # 24h

    # Ausgabedateien (SVG)
    "SVG_LAST_MIN": "jw_last_minute.svg",
    "SVG_LAST_HOUR": "jw_last_hour.svg",
    "SVG_LAST_DAY": "jw_last_24h.svg",

    # Optionaler FTP-Upload (unverschlüsselt!)
    "ENABLE_FTP_UPLOAD": False,
    "FTP_HOST": "ftp.example.com",
    "FTP_USER": "user",
    "FTP_PASSWORD": "password",
    "FTP_TARGET_DIR": "/remote/path",
}

# ----------------------------
# Hilfsfunktionen für Skalierung
# ----------------------------

def signed14_from_u16(u16: int) -> int:
    """Wandelt 16-bit Wort in 14-bit signed (Bit 0..13 gültig, Vorzeichenbit 1<<13)."""
    val14 = u16 & 0x3FFF
    if val14 & 0x2000:  # Vorzeichenbit
        val14 -= 0x4000
    return val14

def scale_to_g(s14: int, sensor_range_g: float) -> float:
    """Lineare Skalierung signed-14-bit -> g. Fullscale ~8191."""
    fullscale = (2 ** 13 - 1)  # 8191
    return (s14 / fullscale) * sensor_range_g

# ----------------------------
# Rolling Buffer
# ----------------------------

class RollingBuffer:
    """Ringpuffer für 24h Zeitreihen: ax, ay, az, |a|."""
    def __init__(self, max_seconds: int):
        self.t: Deque[float] = collections.deque(maxlen=max_seconds)
        self.ax: Deque[float] = collections.deque(maxlen=max_seconds)
        self.ay: Deque[float] = collections.deque(maxlen=max_seconds)
        self.az: Deque[float] = collections.deque(maxlen=max_seconds)
        self.amag: Deque[float] = collections.deque(maxlen=max_seconds)

    def append(self, ax_g: float, ay_g: float, az_g: float):
        now = time.time()
        self.t.append(now)
        self.ax.append(ax_g)
        self.ay.append(ay_g)
        self.az.append(az_g)
        self.amag.append(math.sqrt(ax_g*ax_g + ay_g*ay_g + az_g*az_g))

    def slice_seconds(self, secs: int) -> Tuple[List[float], List[float]]:
        """Letzte secs Sekunden: (t_rel_in_s, amag)."""
        if not self.t:
            return [], []
        t_now = self.t[-1]
        idx = [i for i, ti in enumerate(self.t) if (t_now - ti) <= secs]
        t_rel = [self.t[i] - self.t[idx[0]] for i in idx] if idx else []
        amag = [self.amag[i] for i in idx] if idx else []
        return t_rel, amag

# ----------------------------
# SVG-Erzeugung (nur Standardbibliothek, keine externen Pakete)
# ----------------------------

def make_svg_timeseries(t_s: List[float], a_g: List[float], window_label: str, out_file: str) -> None:
    """
    Erzeugt ein schlankes SVG mit Zeitreihe |a|(g) und schematischen PGA-Bändern.
    """
    width, height = 900, 320
    padding = 50
    # Schwellen (schematisch)
    thresholds = [0.01, 0.05, 0.10, 0.20, 0.5, 1.0]
    colors     = ["#9be7ff", "#73d6ff", "#4fc3f7", "#ffb74d", "#ff8a65", "#e57373"]

    # Achsenbereiche
    x_max = max(t_s) if t_s else 60.0
    y_max = max([max(thresholds), (max(a_g) if a_g else 0.0)]) * 1.15

    def x_map(x): return padding + (x / max(1e-9, x_max)) * (width - 2*padding)
    def y_map(y): return height - padding - (y / max(1e-9, y_max)) * (height - 2*padding)

    # Header
    ts = dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    svg = [
        f'<svg xmlns="http://www.w3.org/2000/svg" width="{width}" height="{height}">',
        f'<rect x="0" y="0" width="{width}" height="{height}" fill="white"/>',
        f'<text x="{padding}" y="24" font-family="Arial" font-size="16" fill="#111">'
        f'JoyWarrior 24F14USB – Verlauf {window_label} (|a| in g)</text>',
        f'<text x="{padding}" y="{height-10}" font-family="Arial" font-size="11" fill="#555">'
        f'Zeit: {ts} • Hinweis: schematisch, JoyWarrior ist kein Seismometer.</text>',
    ]

    # Hintergrund-Bänder
    y_start = 0.0
    for t, c in zip(thresholds, colors):
        y1 = y_map(y_start)
        y2 = y_map(t)
        h  = y1 - y2
        svg.append(f'<rect x="{padding}" y="{y2}" width="{width-2*padding}" height="{h}" fill="{c}" opacity="0.25"/>')
        y_start = t

    # Achsen
    svg.append(f'<line x1="{padding}" y1="{height-padding}" x2="{width-padding}" y2="{height-padding}" stroke="#000"/>')
    svg.append(f'<line x1="{padding}" y1="{padding}" x2="{padding}" y2="{height-padding}" stroke="#000"/>')
    # Y-Ticks
    for t in thresholds + [round(y_max, 3)]:
        y = y_map(t)
        svg.append(f'<line x1="{padding-5}" y1="{y}" x2="{padding}" y2="{y}" stroke="#000"/>')
        svg.append(f'<text x="{padding-40}" y="{y+4}" font-family="Arial" font-size="11" fill="#000">{t:.3f} g</text>')
    # X-Ticks (5 Ticks)
    for frac in [0.0, 0.25, 0.5, 0.75, 1.0]:
        x = x_map(x_max*frac)
        svg.append(f'<line x1="{x}" y1="{height-padding}" x2="{x}" y2="{height-padding+5}" stroke="#000"/>')
        svg.append(f'<text x="{x-10}" y="{height-padding+20}" font-family="Arial" font-size="11" fill="#000">{(x_max*frac):.0f}s</text>')

    # Kurve
    if t_s and a_g:
        points = " ".join([f"{x_map(x)},{y_map(y)}" for x, y in zip(t_s, a_g)])
        svg.append(f'<polyline points="{points}" fill="none" stroke="#0b3d91" stroke-width="1.5"/>')
        pga = max(a_g)
        y_pga = y_map(pga)
        svg.append(f'<line x1="{padding}" y1="{y_pga}" x2="{width-padding}" y2="{y_pga}" stroke="red" stroke-dasharray="5,4"/>')
        svg.append(f'<text x="{width-padding-160}" y="{y_pga-6}" font-family="Arial" font-size="11" fill="red">PGA: {pga:.4f} g</text>')

    svg.append('</svg>')
    with open(out_file, "w", encoding="utf-8") as f:
        f.write("\n".join(svg))

# ----------------------------
# HID – Linux (hidraw)
# ----------------------------

class LinuxHIDRaw:
    """
    Öffnet ein /dev/hidraw* Gerät, dessen Produktname 'JoyWarrior' o.ä. enthält.
    Liest Input-Reports blockierend mit Timeout.

    Hintergrund zu hidraw (Roh-Reports, erstes Byte ggf. Report-ID): Linux-Kernel-Doku.  # [1](https://docs.kernel.org/hid/hidraw.html)
    """
    def __init__(self, report_len: int, axis_offsets: Tuple[int, int, int], sensor_range_g: float,
                 name_hints: List[str]):
        self.fd: Optional[int] = None
        self.report_len = report_len
        self.axis_offsets = axis_offsets
        self.sensor_range_g = sensor_range_g
        self.name_hints = [s.lower() for s in name_hints]

    def open(self):
        # Heuristik: erstes /dev/hidraw*, das im sysfs-String Produktname-Hints enthält
        for dev in glob.glob("/dev/hidraw*"):
            try:
                # Produktstring nachschlagen:
                # /sys/class/hidraw/hidrawX/device/../../product
                base = os.path.basename(dev)
                prod_path = f"/sys/class/hidraw/{base}/device/../../product"
                if os.path.exists(prod_path):
                    with open(prod_path, "r", encoding="utf-8", errors="ignore") as pf:
                        prod = pf.read().strip().lower()
                        if any(h in prod for h in self.name_hints):
                            self.fd = os.open(dev, os.O_RDONLY | os.O_NONBLOCK)
                            return
            except Exception:
                continue
        raise RuntimeError("JoyWarrior (hidraw) nicht gefunden. Prüfen Sie VID/PID oder udev-Zugriff.")

    def close(self):
        if self.fd is not None:
            try:
                os.close(self.fd)
            except Exception:
                pass
            self.fd = None

    def read_axes_g(self, timeout_ms: int = 500) -> Tuple[float, float, float]:
        """
        Liest EIN Report (blocking mit Timeout) und extrahiert X/Y/Z (2 Byte/achse, LE).
        Falls Ihr Gerät nummerierte Reports nutzt, kann Byte 0 die Report-ID sein.  # [1](https://docs.kernel.org/hid/hidraw.html)
        """
        if self.fd is None:
            raise RuntimeError("hidraw nicht geöffnet.")
        import select
        rlist, _, _ = select.select([self.fd], [], [], timeout_ms / 1000.0)
        if not rlist:
            raise TimeoutError("hidraw: Timeout ohne Report.")
        data = os.read(self.fd, self.report_len)
        if not data or len(data) < max(self.axis_offsets) + 2:
            raise TimeoutError("hidraw: unvollständiger Report.")

        def le16(offset: int) -> int:
            return data[offset] | (data[offset+1] << 8)

        x_g = scale_to_g(signed14_from_u16(le16(self.axis_offsets[0])), self.sensor_range_g)
        y_g = scale_to_g(signed14_from_u16(le16(self.axis_offsets[1])), self.sensor_range_g)
        z_g = scale_to_g(signed14_from_u16(le16(self.axis_offsets[2])), self.sensor_range_g)
        return x_g, y_g, z_g

# ----------------------------
# HID – Windows (ctypes + HID/SetupAPI)
# ----------------------------

class WindowsHID:
    """
    Windows: Geräte-Suche über SetupAPI + HidD_* (hid.dll), Lesen per HidD_GetInputReport
    (liefert den aktuellen Input-Report; für benummerte Reports ggf. erstes Byte = Report-ID).  # [2](https://learn.microsoft.com/en-us/windows-hardware/drivers/hid/hid-api)
    """
    def __init__(self, report_len: int, axis_offsets: Tuple[int, int, int], sensor_range_g: float,
                 name_hints: List[str], vendor_id: Optional[int], product_id: Optional[int]):
        self.report_len = report_len
        self.axis_offsets = axis_offsets
        self.sensor_range_g = sensor_range_g
        self.name_hints = [s.lower() for s in name_hints]
        self.vendor_id = vendor_id
        self.product_id = product_id
        self.handle = None

        # Laden benötigter DLLs
        self.hid = ctypes.WinDLL("hid.dll")
        self.setupapi = ctypes.WinDLL("setupapi.dll")
        self.kernel32 = ctypes.WinDLL("kernel32.dll")

        # GUID der HID-Klasse holen
        self.HidD_GetHidGuid = self.hid.HidD_GetHidGuid
        self.HidD_GetHidGuid.argtypes = [ctypes.POINTER(ctypes.c_byte*16)]
        self.HidD_GetHidGuid.restype = None

        # Produktstring-Funktion
        self.HidD_GetProductString = self.hid.HidD_GetProductString
        self.HidD_GetProductString.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.ULONG]
        self.HidD_GetProductString.restype = wintypes.BOOL  # [3](https://learn.microsoft.com/en-us/windows-hardware/drivers/ddi/hidsdi/nf-hidsdi-hidd_getproductstring)

        # InputReport-Funktion
        self.HidD_GetInputReport = self.hid.HidD_GetInputReport
        self.HidD_GetInputReport.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.ULONG]
        self.HidD_GetInputReport.restype = wintypes.BOOL     # [2](https://learn.microsoft.com/en-us/windows-hardware/drivers/hid/hid-api)

        # CreateFile für Gerätepfad
        self.CreateFile = self.kernel32.CreateFileW
        self.CreateFile.argtypes = [wintypes.LPCWSTR, wintypes.DWORD, wintypes.DWORD,
                                    wintypes.LPVOID, wintypes.DWORD, wintypes.DWORD, wintypes.HANDLE]
        self.CreateFile.restype = wintypes.HANDLE

        # CloseHandle
        self.CloseHandle = self.kernel32.CloseHandle
        self.CloseHandle.argtypes = [wintypes.HANDLE]
        self.CloseHandle.restype = wintypes.BOOL

        # SetupAPI: Geräte-Enumeration (vereinfachte Nutzung)
        self.SetupDiGetClassDevs = self.setupapi.SetupDiGetClassDevsW
        self.SetupDiGetClassDevs.argtypes = [ctypes.POINTER(ctypes.c_byte*16), wintypes.LPCWSTR, wintypes.HWND, wintypes.DWORD]
        self.SetupDiGetClassDevs.restype = wintypes.HANDLE

        self.SP_DEVICE_INTERFACE_DATA = type("SP_DEVICE_INTERFACE_DATA", (ctypes.Structure,), {
            "_fields_": [("cbSize", wintypes.DWORD),
                        ("InterfaceClassGuid", ctypes.c_byte*16),
                        ("Flags", wintypes.DWORD),
                        ("Reserved", ctypes.c_ulonglong)]
        })

        self.SetupDiEnumDeviceInterfaces = self.setupapi.SetupDiEnumDeviceInterfaces
        self.SetupDiEnumDeviceInterfaces.argtypes = [wintypes.HANDLE, wintypes.LPVOID, ctypes.POINTER(ctypes.c_byte*16),
                                                     wintypes.DWORD, ctypes.POINTER(self.SP_DEVICE_INTERFACE_DATA)]
        self.SetupDiEnumDeviceInterfaces.restype = wintypes.BOOL

        self.SP_DEVICE_INTERFACE_DETAIL_DATA_W = type("SP_DEVICE_INTERFACE_DETAIL_DATA_W", (ctypes.Structure,), {
            "_fields_": [("cbSize", wintypes.DWORD),
                        ("DevicePath", wintypes.WCHAR * 260)]
        })

        self.SetupDiGetDeviceInterfaceDetail = self.setupapi.SetupDiGetDeviceInterfaceDetailW
        self.SetupDiGetDeviceInterfaceDetail.argtypes = [wintypes.HANDLE,
                                                         ctypes.POINTER(self.SP_DEVICE_INTERFACE_DATA),
                                                         ctypes.POINTER(self.SP_DEVICE_INTERFACE_DETAIL_DATA_W),
                                                         wintypes.DWORD, ctypes.POINTER(wintypes.DWORD), wintypes.LPVOID]
        self.SetupDiGetDeviceInterfaceDetail.restype = wintypes.BOOL

        self.SetupDiDestroyDeviceInfoList = self.setupapi.SetupDiDestroyDeviceInfoList
        self.SetupDiDestroyDeviceInfoList.argtypes = [wintypes.HANDLE]
        self.SetupDiDestroyDeviceInfoList.restype = wintypes.BOOL

    def open(self):
        # GUID holen
        guid_buf = (ctypes.c_byte * 16)()
        self.HidD_GetHidGuid(ctypes.byref(guid_buf))

        DIGCF_PRESENT = 0x00000002
        DIGCF_DEVICEINTERFACE = 0x00000010
        hdevinfo = self.SetupDiGetClassDevs(ctypes.byref(guid_buf), None, None,
                                            DIGCF_PRESENT | DIGCF_DEVICEINTERFACE)
        if hdevinfo == wintypes.HANDLE(-1).value:
            raise RuntimeError("SetupDiGetClassDevs fehlgeschlagen.")

        try:
            index = 0
            while True:
                did = self.SP_DEVICE_INTERFACE_DATA()
                did.cbSize = ctypes.sizeof(self.SP_DEVICE_INTERFACE_DATA)
                ok = self.SetupDiEnumDeviceInterfaces(hdevinfo, None, ctypes.byref(guid_buf), index, ctypes.byref(did))
                if not ok:
                    break

                # Detail mit Pfad
                required = wintypes.DWORD()
                detail = self.SP_DEVICE_INTERFACE_DETAIL_DATA_W()
                detail.cbSize = ctypes.sizeof(wintypes.DWORD) + ctypes.sizeof(wintypes.WCHAR)  # Windows-Anforderung
                self.SetupDiGetDeviceInterfaceDetail(hdevinfo, ctypes.byref(did), ctypes.byref(detail),
                                                     ctypes.sizeof(detail), ctypes.byref(required), None)
                path = detail.DevicePath

                # Gerät öffnen
                GENERIC_READ = 0x80000000
                GENERIC_WRITE = 0x40000000
                FILE_SHARE_READ = 0x00000001
                FILE_SHARE_WRITE = 0x00000002
                OPEN_EXISTING = 3
                FILE_ATTRIBUTE_NORMAL = 0x00000080

                handle = self.CreateFile(path, GENERIC_READ | GENERIC_WRITE,
                                         FILE_SHARE_READ | FILE_SHARE_WRITE,
                                         None, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, None)
                if handle == wintypes.HANDLE(-1).value:
                    index += 1
                    continue

                # Produktstring holen und prüfen
                buf = ctypes.create_unicode_buffer(256)
                if self.HidD_GetProductString(handle, buf, ctypes.sizeof(buf)):
                    prod = buf.value.lower()
                    match_name = any(h in prod for h in self.name_hints)
                else:
                    prod = ""
                    match_name = False

                # VID/PID prüfen (optional): aus Pfad (…vid_XXXX&pid_YYYY…)
                match_vidpid = True
                if self.vendor_id and self.product_id:
                    s = path.lower()
                    try:
                        vid_pos = s.index("vid_")
                        pid_pos = s.index("pid_")
                        vid_hex = s[vid_pos+4:vid_pos+8]
                        pid_hex = s[pid_pos+4:pid_pos+8]
                        match_vidpid = (int(vid_hex, 16) == self.vendor_id and int(pid_hex, 16) == self.product_id)
                    except Exception:
                        match_vidpid = False

                if match_name or match_vidpid:
                    self.handle = handle
                    return
                else:
                    self.CloseHandle(handle)
                    index += 1

        finally:
            self.SetupDiDestroyDeviceInfoList(hdevinfo)

        raise RuntimeError("JoyWarrior (Windows HID) nicht gefunden.")

    def close(self):
        if self.handle:
            try:
                self.CloseHandle(self.handle)
            except Exception:
                pass
            self.handle = None

    def read_axes_g(self, timeout_ms: int = 500) -> Tuple[float, float, float]:
        """
        Holt EINEN aktuellen Input-Report via HidD_GetInputReport (blockiert kurz).
        Hinweis: Bei nummerierten Reports kann das erste Byte die Report-ID sein.  # [2](https://learn.microsoft.com/en-us/windows-hardware/drivers/hid/hid-api)
        """
        if not self.handle:
            raise RuntimeError("HID nicht geöffnet.")
        # Input-Buffer vorbereiten
        buf = (ctypes.c_ubyte * CONFIG["REPORT_LENGTH"])()
        # Optional: buf[0] = REPORT_ID (0 bei nicht nummerierten Reports)
        ok = self.HidD_GetInputReport(self.handle, buf, CONFIG["REPORT_LENGTH"])
        if not ok:
            raise TimeoutError("HidD_GetInputReport schlug fehl (kein Report).")

        data = bytes(buf)
        if len(data) < max(CONFIG["AXIS_OFFSETS"]) + 2:
            raise TimeoutError("Unvollständiger Report.")

        def le16(offset: int) -> int:
            return data[offset] | (data[offset+1] << 8)

        x_g = scale_to_g(signed14_from_u16(le16(CONFIG["AXIS_OFFSETS"][0])), CONFIG["SENSOR_RANGE_G"])
        y_g = scale_to_g(signed14_from_u16(le16(CONFIG["AXIS_OFFSETS"][1])), CONFIG["SENSOR_RANGE_G"])
        z_g = scale_to_g(signed14_from_u16(le16(CONFIG["AXIS_OFFSETS"][2])), CONFIG["SENSOR_RANGE_G"])
        return x_g, y_g, z_g

# ----------------------------
# FTP-Upload (Standardbibliothek)
# ----------------------------

def ftp_upload(local_file: str):
    ftp = ftplib.FTP(CONFIG["FTP_HOST"], CONFIG["FTP_USER"], CONFIG["FTP_PASSWORD"])
    # Zielverzeichnis sicherstellen
    try:
        ftp.cwd(CONFIG["FTP_TARGET_DIR"])
    except ftplib.error_perm:
        # rekursiv anlegen
        parts = CONFIG["FTP_TARGET_DIR"].strip("/").split("/")
        cur = ""
        for p in parts:
            cur = f"{cur}/{p}" if cur else p
            try:
                ftp.mkd(cur)
            except ftplib.error_perm:
                pass
        ftp.cwd(CONFIG["FTP_TARGET_DIR"])
    with open(local_file, "rb") as f:
        ftp.storbinary(f"STOR {os.path.basename(local_file)}", f)
    ftp.quit()

# ----------------------------
# Lauflogik
# ----------------------------

def main():
    # HID öffnen, OS-spezifisch
    if os.name == "nt":
        hid_dev = WindowsHID(CONFIG["REPORT_LENGTH"], CONFIG["AXIS_OFFSETS"], CONFIG["SENSOR_RANGE_G"],
                             CONFIG["PRODUCT_NAME_HINTS"], CONFIG["VENDOR_ID"], CONFIG["PRODUCT_ID"])
    elif os.name == "posix":
        hid_dev = LinuxHIDRaw(CONFIG["REPORT_LENGTH"], CONFIG["AXIS_OFFSETS"], CONFIG["SENSOR_RANGE_G"],
                              CONFIG["PRODUCT_NAME_HINTS"])
    else:
        print("Nicht unterstütztes OS.")
        sys.exit(1)

    try:
        hid_dev.open()
        print("HID geöffnet.")
    except Exception as e:
        print("HID-Öffnen fehlgeschlagen:", e)
        sys.exit(1)

    buf = RollingBuffer(CONFIG["BUFFER_SECONDS"])
    sample_interval = CONFIG["SAMPLE_INTERVAL_S"]
    next_t = time.time()

    try:
        while True:
            now = time.time()
            if now < next_t:
                time.sleep(next_t - now)

            try:
                ax_g, ay_g, az_g = hid_dev.read_axes_g(timeout_ms=500)
            except TimeoutError as e:
                # Kein Report bekommen – Null setzen (wir arbeiten sekündlich weiter)
                print("HID-Timeout:", e)
                ax_g = ay_g = az_g = 0.0
            except Exception as e:
                print("HID-Lesefehler:", e)
                break

            buf.append(ax_g, ay_g, az_g)

            # Plots erzeugen
            t1, a1 = buf.slice_seconds(60)
            t2, a2 = buf.slice_seconds(3600)
            t3, a3 = buf.slice_seconds(86400)

            make_svg_timeseries(t1, a1, "letzte Minute", CONFIG["SVG_LAST_MIN"])
            make_svg_timeseries(t2, a2, "letzte Stunde", CONFIG["SVG_LAST_HOUR"])
            make_svg_timeseries(t3, a3, "letzte 24 Stunden", CONFIG["SVG_LAST_DAY"])
            print(f"SVGs aktualisiert: {CONFIG['SVG_LAST_MIN']}, {CONFIG['SVG_LAST_HOUR']}, {CONFIG['SVG_LAST_DAY']}")

            # Optionaler FTP-Upload (unverschlüsselt!)
            if CONFIG["ENABLE_FTP_UPLOAD"]:
                for f in (CONFIG["SVG_LAST_MIN"], CONFIG["SVG_LAST_HOUR"], CONFIG["SVG_LAST_DAY"]):
                    try:
                        ftp_upload(f)
                        print(f"FTP-Upload OK: {f}")
                    except Exception as e:
                        print(f"FTP-Upload fehlgeschlagen ({f}):", e)

            next_t += sample_interval

    except KeyboardInterrupt:
        print("Beendet durch Benutzer.")
    finally:
        hid_dev.close()
        print("HID geschlossen.")

if __name__ == "__main__":
    main()
Dabei werden SVG-Dateien lokal abgelegt, die optional auch per FTP hochgeladen werden koennen.

Oliver
Dateianhänge
demografik.png
demografik.png (18.95 KiB) 97 mal betrachtet
Benutzeravatar
gargamel
Offline
Beiträge: 379
Registriert: 07 Dez 2020, 16:48
Wohnort: Nettetal-Kaldenkirchen
Hat sich bedankt: 3 mal
Danksagung erhalten: 16 mal
Kontaktdaten:

Re: Programm erstellung Erdbebensensor (ehem. QCN)

#5

Beitrag von gargamel »

Hallo Ollicat,
erstmal vielen dank für deine Bemühung.
Ich habe nun diesen Code von dir ins exe machen wollen (per Internet py to exe).
Es klappt leider nicht (endlosschleife)
Wenn das bei dir läuft, so wie ich es sehe, kannst du mir mal die exe dazu senden?
LG
Wilfried

Mitglied beim www.ruhrwetter.de und www.wettersektor.de
Wetterstation: Nettetal-Kaldenkirchen www.wilfried-monika.de

Davis VantagePro2 Active Plus, Wetterdirekt Sirius 300 und Sturmglas, TOA-Blitzortung, Radioaktivmessung
Antworten