In der Elektrotechnik und Automatisierung gibt es eine ganze Reihe an Übertragungsfunktionen, die ein Eingangssignal in ein Ausgangssignal umwandeln. Im Folgenden stelle ich einige der Funktionen vor, die ich bei der Erfassung von Signalen durch informationstechnische Systeme häufig bei meiner Arbeit benötige.
Definition Übertragungsbaustein
Ein Übertragungsbaustein hat ein Eingangssignal, einen internen Zustand und ein Ausgangssignal. Bei Eingabe eines Signals wird dabei der interne Zustand aktualisiert. Unabhängig von der Eingabe kann ein Ausgangssignal erzeugt werden. In Pseudo-Code gesprochen würde das etwa so aussehen:
class TransferBlock<S extends State, T extends Signaltype> {
S state = initialState
T cycle( T currentInputValue ) {
processState( currentInputValue )
if( hasNewOutputValue ) {
return newOutputValue
} else {
return null
}
}
}
Dabei wird die cycle-Methode ständig aus einer while-true-Schleife heraus aufgerufen, wobei immer das aktuell anliegende Eingangssignal übergeben wird und ein eventuell entstehendes Ausgabesignal an das übergeordnete System zurückgemeldet wird.
Beispiel aus der Praxis
Wir haben mehrere Maschinen, deren Gesamt-Zustand überwacht werden soll. Nun könnten wir kontinuierlich (= im Zyklustakt der Maschine bzw. in einer while-true-Schleife) den gesamten Zustand der Maschine einlesen und in unserem Überwachungssystem speichern. Dabei könnte der Zustand der Maschine wie folgt dargestellt werden:
class MaschineState {
bool running
int errorCode
int insertedPieces
bool outputtingPiece
double rotationSpeedPreset
double rotationSpeedCurrent
}
Aber damit würden wir potentiell viel zu viel speichern, denn um den Maschinenzustand historisch nachvollziehen zu können, reichen oft weniger Informationen. Daher wenden wir je nach Teilzustand unterschiedliche, manchmal auch verkettete Übertragungsbausteine an, um die Datenrate zu verringern. Hier wäre eine mögliche Konfiguration von Übertragungsbausteinen pro Teilzustand:
- running : Change-Trigger
- errorCode : Filter(Ignore 0) : Change-Trigger
- insertedPieces : Change-Trigger : Aggregate(Sum, 2h)
- outputtingPieces : R-Trigger : Aggregate(Count, 1min) : DeadBand(200, timeout=2h)
- rotationSpeedPreset : Change-Trigger
- rotationSpeedCurrent : DeadBand(10 Hz, timeout=1h) : Scale(from=[0,1000.0], to=[0,1.0])
- active : rotationSpeedCurrent : ThresholdDigital(1)
- speedWarning : rotationSpeedCurrent : HighLowAlarm(h=800, l=50, hh=1500, ll=0.5)
Dabei werden die Bausteine miteinander verkettet: der linke Baustein in der Liste gibt sein Ausgangssignal an den rechten Baustein als Eingangssignal weiter. Das letzte Ausgangssignal wird dann an das übergeordnete System zur weiteren Verarbeitung übergeben, bspw. um es in einer Zeitreihendatenbank zu speichern.
Eine Auswahl an wichtigen Übertragungsbausteinen
Folgend sind einige der Übertragungsbausteine aufgeführt, die ich regelmäßig bei der Arbeit sehe. Zu jedem Baustein sind die Einstellungsmöglichkeiten und die Pseudo-Code-Implementierung der cycle-Funktion von oben angegeben.
Totband
Ein Totband unterdrückt kleinere Schwankungen in dem Eingangssignal.
T cycle(T current) {
if( abs(current - state.last) > settings.threshold
|| (now() - state.lastTime) > settings.timeout ) {
state.last = current
state.lastTime = now()
return current
}
return null
}
threshold | Breite des Totbandes. Werkfluktuationen innerhalb dieser Toleranz werden unterdrückt |
timeout | Nach Ablauf dieser Dauer wird der aktuelle Wert weitergemeldet, auch wenn das Totband den Wert eigentlich unterdrücken würde |
Anmerkung: Es ist theoretisch denkbar, das Totband asymmetrisch zu implementieren, aber dafür ist mir bisher kein Anwendungsfall vorgekommen.
Change-Trigger, R-Trig, F-Trig
Ein Trigger meldet nur Wertänderungen. Wenn mehrfach der gleiche Wert als Eingangssignal empfangen wird, so wird dies unterdrückt. R-Trigger melden dabei nur steigende Flanken (raising flank). Also von 0 auf 1, false nach true. F-Trigger hingegen melden nur fallende Flanken weiter (falling flank).
T cycle(T current) { // change-trigger
if( current != state.last ) {
state.last = current;
return current;
}
return null;
}
T cycle(T current) { // r-trigger
if( current && !state.last ) {
state.last = current;
return current;
}
state.last = current;
return null;
}
T cycle(T current) { // f-trigger
if( !current && state.last ) {
state.last = current;
return current;
}
state.last = current;
return null;
}
Informationstheoretische Anmerkung: Genaugenommen sind Trigger somit ein Spezialfall von Totbändern (bzw. asymmetrischen Totbändern). Anmerkung: Es wäre natürlich ebenfalls möglich, auch Triggern einen Timeout zu geben. Aber das ist mir in der Praxis noch nicht vorgekommen.
Skalierung
Ein Wert wir von einem Eingangsintervall auf ein Ausgangsintervall gemappt. Ein Wert im Intervall [-1;1] könnte so unter Beibehalten der Position bezogen auf das Intervall auf [0;1] gemappt werden. So werden beispielsweise auch Größenordnungen von Einheiten umgerechnet: °C zu °F oder Kilometer zu Meter oder kHz zu Hz.
T cycle(T current) {
current -= stettings.inputInterval.low
current /= (settings.inputInterval.high - settings.inputInterval.low)
current *= (settings.outputInterval.high - settings.outputInterval.low)
current += settings.outputInterval.low
return current
}
inputInterval.low | Unterer Grenzwert des Intervalls in dem Eingangssignale liegen können |
inputInterval.high | Oberer Grenzwert des Intervalls in dem Eingangssignale liegen können |
outputInterval.low | Unterer Grenzwert des Intervalls in dem Ausgangssignale liegen sollen |
outputInterval.high | Oberer Grenzwert des Intervalls in dem Ausgangssignale liegen sollen |
Anmerkung: Für optimale Performance sollte das Verhältnis der Intervallbreiten vorberechnet werden. So spart man sich pro cycle eine Multiplikation.
HighLow-Alarm
Dient dazu, einen Alarmzustand aus einem Analogwert abzuleiten. Am Beispiel eines Kesseldrucks: Bei 0bar Überdruck im Kessel kann man davon ausgehen, dass kein Kesseldruck anliegt. Das würde einen Low-Alarm implizieren. Bei 100bar Kesseldruck hingegen ist in diesem Beispiel die Belastungsgrenze überschritten und ein High-Alarm würde anliegen. Ferner können beliebig viele Abstufungen von High und Low gewählt werden. So könnte ein 2-Stufiger HighLow-Alarm so funktionieren:
- LowLow: 0 bar: Kessel ist undicht
- Low: 0-1 bar: Kessel ist im Standby und hat zu wenig Druck für den Betrieb
- Good: 1-5 bar: Kessel ist betriebsbereit
- High: 5-10 bar: Kessel steht unter Überdruck und sollte entlastet werden
- HighHigh: 10+ bar: Kessel explodiert gleich
Üblich ist ein zweistufiges System mit LL, L, Good, H und HH. Wobei LL der kleinste Wert ist, der physikalisch möglich ist vor Systemversagen, L und H Warnwerte und HH der physikalisch größtmögliche Wert vor Systemversagen.
AlarmType cycle(T current) {
// Reihenfolge der Prüfungen beachten
if( current < settings.LL ) {
return AlarmType.LL
}
if( current < settings.L ) {
return AlarmType.L
}
if( current > settings.HH ) {
return AlarmType.HH
}
if( current > settings.H ) {
return AlarmType.H
}
return AlarmType.Good
}
LL | Low-Low-Grenzwert |
L | Low-Grenzwert |
H | High-Grenzwert |
HH | High-High-Grenzwert |
Down-Sampling
Mittels Sampling wird aus einer unsteten Datenrate eine stetige Datenrate erzeugt. Und üblicherweise auch eine geringere. Man stelle sich ein Eingabesignal aus der Natur vor: die Raumtemperatur. Genau genommen ändert diese sich ständig (= unendlich oft pro Sekunde) oder maximal 1 Mal je Plank-Zeit. Aber kein Sensor könnte je eine so hohe Änderungsrate aufzeichnen - und es ist auch egal, denn wir benötigen diese Fein-Erfassung nicht. Um Änderungen der Raumtemperatur nachzuempfinden brauchen wir - je nach Anwendungsfall - nur einen Wert (ein Sample) pro Millisekunde bis Stunde. Üblicherweise reduziere ich damit einfach die Datenrate, denn Sensoren liefern eigentlich immer sehr stabil mehr Samples als für eine Daueraufzeichnung benötigt.
T cycle(T current) {
if( now() - state.lastTime > settings.samplingInterval ) {
state.lastTime = now()
return current
}
return null
}
samplingInterval | Zeitdauer zwischen zwei Werten bzw. 1/samplingFrequency |
Filter
Ein Filter leitet nur Eingangssignale weiter, die dem Filterkriterium entsprechen. So können beispielsweise +/-Infinity als ungültige Temperaturmesswerte herausgefiltert werden. Solche Werte können entstehen, wenn ein Sensor beschädigt ist oder Messwerte anzeigen möchte, die außerhalb des Erfassungsbereichs liegen.
T cycle(T current) {
if( settings.filter.match(current) ) {
return current
}
return null
}
filter | Filter-Funktion. Einfachere Filter könnten diese Funktion auch direkt implementieren |
Es gibt einige Standard-Filter, die häufig vorkommen. Beispielsweise: (Zahl ist finit), (Zahl ist positiv), (Zahl ist nicht 0), (Schließe folgende Werte aus:...), (Lass nur folgende Werte passieren:...), (Lass nur folgenden Wertbereich passieren:...).
Threshold-Digitalisierer
Wandelt analoge Signale in digitale um, indem Wertebereiche definiert werden, in denen alle Eingangswerte dem gleichen Ausgangswert zugewiesen werden. Üblicherweise gibt es zwei Wertebereiche mit einem Schwellwert dazwischen: alles unter 0,5 wird 0 und alles über 0,5 wird 1.
Tout cycle(Tin current) {
if ( current < settings.threshold ) {
return settings.low
} else {
return settings.high
}
}
settings.threshold | der entscheidende Schwellwert |
settings.low | der Zielwert für Eingaben unterhalb des Threshold |
settings.high | der Zielwert für Eingaben oberhalb des Threshold |
Hinweis: Genau genommen sind HighLow-Alarme ein Spezialfall hiervon.
IgnoreFirst
Häufig will man den ersten (oder die ersten n) gemeldeten Eingangssignale beim Systemstart ignorieren. Dafür gibt es unterschiedliche Gründe, bspw. will man einen Initialzustand definieren oder Duplikatmeldungen bei FailOver in redundanten Clustern vermeiden oder dem System eine gewisse Zeit zum Hochfahren einräumen. Hierfür gibt es oft verschiedene Logik, ab wann das System Signale passieren lässt. In den allermeisten Fällen genügt jedoch das Ignorieren des wirklich ersten Wertes.
T cycle(T current) {
if( ! state.firstPassed ) {
state.firstPassed = true
return null
}
return current
}
Aggregate
Ein Aggregat (engl. aggregate) berechnet einen Gesamt-Wert über einen Zeitraum aus den Eingangssignalen und meldet diesen pro abgelaufenem Zeitraum weiter.
Tout cycle(Tin current) {
timeBucketProgress = now() - state.timeBucketStart
state.result = settings.aggregateFunction(state.result, current, timeBucketProgress)
output = null
if( timeBucketProgress >= settings.timeBucketWidth ) {
output = state.result
state.result = null
state.timeBucketStart = now()
}
return output
}
aggregateFunction | Funktion, die den Aggregatwert aktualisiert |
timeBucketWidth | Zeitspanne des Aggregatzeitbereichs |
Einige Standard-Aggregate sind beispielsweise: Summe, Durchschnitt, Median, Minimum, Maximum, Anzahl Werte.
Entprellen
Entprellt (engl. debounce) werden können nur digitale Signale. Man stelle sich dazu ein Tor vor, das mit Schwung geschlossen wird. Die Türen des Tor prallen dabei ein wenig wieder zurück, nachdem sie aufeinandergeschlagen sind. Dann schwingen sie wieder zurück und fallen endgültig zu. Diese schnelle Zustandsabfolge (initial offen-geschlossen-offen-endgültig geschlossen) sollte aber eigentlich nur die Wertänderung (offen->geschlossen) melden. Das erreicht man durch Entprellen des Signals.
bool cycle(bool current) {
if ( current && state.bounceState == HIGH ) {
return current
}
if ( !current && state.bounceState == LOW ) {
return current
}
if ( state.bounceState != BOUNCING ) {
state.bounceState = BOUNCING
state.bouncingTo = current
state.bounceBegin = now()
}
if ( now() - state.bounceBegin > settings.debounceDuration ) {
state.bounceState = current ? HIGH : LOW
return current
}
switch ( settings.debounceMode ) {
case SAFE: return null
case EAGER: return state.bouncingTo
case LAZY: return !state.bouncingTo
}
}
debounceDuration | übliche Dauer des Prellens dieses Signals |
debounceMode | Steuert das Verhalten während des Prellens:
|