Trwa promocja na kursy embedded

Obsługa zależności czasowych

Jakiś czas temu otrzymałem na maila takie pytanie:

Czy jest jakaś elegancka metoda, aby zarządzać zdarzeniami czasowymi w systemie?
Generalnie unikamy delay’ów i odnosimy się np. do zegara systemowego. Aby uruchomić daną komendę/operację/funkcję w konkretnym momencie czasowym używamy IF-ów. Sprawa się komplikuje jeśli chcemy powiązać czasowo różne zdarzenia w systemie.

Oto przykład: chcemy aby:
1. Zdarzenie X było realizowane co 2 sekundy,
2. Niezależne zdarzenie Y co 5 sekund
3. Zdarzenie Z 1 sekundę po zdarzeniu X.
 
Czy masz tu jakiś pomysł jak taki cel można zrealizować prościej, z wykorzystaniem jakiegoś tricku, aby kod stał się prostszy i bardziej czytelny?

W dzisiejszym wpisie omówię dwa podejścia do tego tematu – z RTOSem i bez.

Weź RTOSa!

To jest cytat, a nawet tytuł prezentacji Mateusza Salamona opisującej typową poradę w takich przypadkach. Generalnie jest to dobra porada. W końcu główną ideą RTOSa jest radzenie sobie z obsługą niezależnych tasków.

I faktycznie w RTOSie możemy rozwiązać ten problem bardzo prosto:

task_y(void *params)
{
    while (1)
    {
        rtos_delay_ms(5000);
        handle_y();
    }
}

I mamy task wykonujący zadanie co 5 sekund. Wyzwalanie jednego tasku z drugiego z opóźnieniem również nie jest trudne:

task_x(void *params)
{
    while (1)
    {
        rtos_delay_ms(2000);
        handle_x();
        rtos_sem_give(sem_z);
    }
}

task_z(void *params)
{
    while (1)
    {
        rtos_sem_take(sem_z);
        rtos_delay_ms(1000);
        handle_z();
    }
}

wykorzystujemy semafor wyzwalany przez task X i odbierany przez task Z. RTOS pod spodem obsługuje oddzielne stosy dla każdego tasku, przełączanie kontekstów, delaye, priorytety itd. Możemy też zamiast semafora użyć kolejki i przekazywać dane z tasku X do Z jednocześnie zapewniając synchronizację. We FreeRTOSie na przykład wszystkie muteksy, semafory itp. tak naprawdę są właśnie kolejkami.

Wygoda vs kontrola

Ale jak to zwykle bywa w przypadku dodawania zewnętrznych bibliotek – dodajemy do projektu bardzo dużo kodu. W ten sposób zwiększamy mocno złożoność. Jednocześnie tracimy kontrolę nad tym co się dzieje i często nawet do końca tego nie rozumiemy. W zamian oszczędzamy czas. Ale czasem może to się obrócić przeciwko nam!

Nie ma chyba lepszego przykładu zwiększenia złożoności i utraty kontroli nad tym co się dzieje wewnątrz niż RTOS. W końcu aby obsługiwać taski niezależnie implementuje scheduler, zmiany kontekstu, obsługuje oddzielne stosy dla tasków, ma wiedzę o zawartości rejestrów CPU i tak dalej.

Dostajemy gotowca i nie musimy się tym wszystkim martwić. Ale możemy w ten sposób spowodować problemy w wielu innych miejscach. Na przykład przez wyścigi, deadlocki, konfigurację stosów dla poszczególnych tasków, priorytetów.

Tak samo jest w przypadku każdej innej biblioteki. Kiedy dodajemy zewnętrzny kod do obsługi wyświetlacza, systemu plików, akcelerometru, stos TCPIP, czy cokolwiek innego – też natrafimy na różne problemy tylko, że inne. Musimy podjąć decyzję, czy użycie zewnętrznego kodu oszczędzi nam czas, czy nie.

W tym konkretnym przypadku uważam, że oszczędzi. Jeżeli obsługa kilku niezależnych zadań jest głównym celem systemu – RTOS jest najlepszym rozwiązaniem. A taki projekt z trzema taskami i niezbyt skomplikowanymi zależnościami jest idealny do nauki.

Ale i tak w dalszej części artykułu spróbujemy sobie to zaimplementować bez RTOSa. Również dla celów edukacyjnych.

Rozwiązanie bez RTOSa

Może się wydawać, że zaproponowane przeze mnie rozwiązanie jest dużo bardziej skomplikowane niż to wyżej na RTOSie. A to dlatego, że większość złożoności jest schowane właśnie w kodzie RTOSa. Tak naprawdę moje rozwiązanie jest mocno uproszczone, a wręcz brakuje mu pewnych elementów, które RTOS ma. Tyle tylko, że kiedy piszemy je samemu, od razu widzimy całą tą złożoność.

Próba rozwiązania tego typu zagadnień samemu pomaga nam też lepiej zrozumieć cel i sposób działania RTOSa. Możemy sobie coś takiego raz napisać, a potem już zawsze korzystać z RTOSa, albo jakiejś biblioteki do obsługi eventów.

Pełny kod mojego rozwiązania bez RTOSa znajdziesz tutaj:
https://godbolt.org/z/P746YsMd3

W praktyce ten kod zostałby podzielony na kilka plików, które zaznaczyłem odpowiednimi komentarzami.

W outpucie możesz zobaczyć, że task X wywołuje się co 5 ticków, task Y co 10 ticków, a task Z 2 ticki po tasku X.

Omówię teraz po kolei, co tam się znajduje.

Po pierwsze mamy funkcję main, która imituje upływ czasu, inicjalizuje i obsługuje eventy:

#define MAX_TIME 1000

int main(void)
{
    int32_t time = 0;

    events_init();
    event_x_add();
    event_y_add();
    event_z_add();

    for (time = 0; time < MAX_TIME; time++)
    {
        printf("time: %d\n", time);
        events_process();
    }
}

Obsługa eventów

Obsługę eventów oparłem na wskaźnikach na funkcje. Przechowuję gdzieś listę wszystkich eventów w systemie i cyklicznie wywołuję funkcję przechodzącą przez wszystkie aktywne eventy i wywołującą je kiedy trzeba.

Każdy event jest strukturą:

typedef void (*handler_t)(void *);
typedef uint32_t timeout_t;

struct event
{
    handler_t handler;
    timeout_t timeout;
    bool is_active;
};

Potrzebuję w niej mieć wskaźnik na funkcję, pozostały timeout i flagę wskazującą, czy event jest aktywny. Moja implementacja jest dostosowana do konkretnego zastosowania. W RTOSie struktura do obsługi tasku, czy kolejki miałaby o wiele więcej elementów.

Mam tablicę eventów:

#define MAX_EVENTS 10
struct event events[MAX_EVENTS];

Mam funkcję do wykonywania typowych akcji na pojedynczym evencie, czyli czyszczenia, resetowania:

void event_clear(struct event *e)
{
    e->handler = NULL;
    e->timeout = 0;
    e->is_active = false;
}

err_code_t event_restart(struct event *e, timeout_t t)
{
    if (e == NULL)
    {
        return -1;
    }

    e->timeout = t;

    return 0;
}

Wszystkie akcje chcę wykonywać w dedykowanych funkcjach, żeby inne moduły nie wiedziały o wewnętrznej implementacji eventów.

Mam też funkcje działające na całej tablicy eventów:

struct event * event_find_free(void)
{
    int32_t i = 0;
    struct event *ret = NULL;

    for (i = 0; i < MAX_EVENTS; i++)
    {
        if (!events[i].is_active)
        {
            ret = &events[i];
            break;
        }
    }

    return ret;
}

err_code_t event_add(handler_t h, timeout_t t)
{
    struct event *e = event_find_free();

    if (e == NULL)
    {
        return -1;
    }

    e->handler = h;
    e->timeout = t;
    e->is_active = true;

    return 0;
}

void events_init(void)
{
    int32_t i = 0;

    for (i = 0; i < MAX_EVENTS; i++)
    {
        event_clear(&events[i]);
    }
}

void events_process(void)
{
    int32_t i;

    for (i = 0; i < MAX_EVENTS; i++)
    {
        if (events[i].is_active)
        {
            struct event * e = &events[i];

            e->timeout--;
            if (e->timeout == 0)
            {
                e->handler(e);
            }
        }
    }
}

Mamy więc funkcję czyszczącą całą tablicę, dodającą pojedynczy event i wywołującą funkcję dla każdego eventu.

Można powiedzieć, że to jest nasz odpowiednik schedulera, ale bardzo ubogi. Możemy dodawać funkcje do wykonania z jakimś opóźnieniem.

Task cykliczny

W naszym zadaniu Y jest taskiem cyklicznym. Jego implementacja wygląda tak:

#define EVENT_Y_TIMEOUT 10

void event_y_callback(void *param);
void add_event_y(void);

void event_y_callback(void *param)
{
    printf("event y\n");
    event_restart(param, EVENT_Y_TIMEOUT);
}

void event_y_add(void)
{
    event_add(event_y_callback, EVENT_Y_TIMEOUT);
}

Funkcja event_y_callback zawiera obsługę naszego tasku. W tym wypadku ograniczyłem się do wyprintowania na konsolę. A na samym końcu task restartuje swój timeout. Robi to przy użyciu argumentu param i funkcji do obsługi eventu. Task Y nie musi nic wiedzieć o wewnętrznej implementacji eventów – mamy działającą abstrakcję.

Dodawanie eventu do naszego schedulera jest zrealizowane za pomocą funkcji event_y_add.

Task wyzwalający akcję

W naszym przykładzie task X jest wywoływany cyklicznie, a jednocześnie wyzwala task Z. Jego implementacja musi więc trochę się różnić od tasku Y. Nie chcemy, aby task X wiedział o wszystkich taskach, które wyzwala. Zamiast tego będzie on udostępniał mechanizm subskrypcji. A dodanie konkretnej akcji będzie w gestii tasku Z. Po raz kolejny dbamy o abstrakcję i odpowiedni przepływ zależności. To task Z zależy od X, dlatego on powinien się zapisywać.

Bardzo często robimy odwrotnie i potem ciężko się połapać w kodzie. Dlatego, że bez wnikliwej znajomości systemu nie wiemy dlaczego task X miały wykonywać funkcje dla tasku Z i po rozbudowaniu systemu jeszcze 10 innych tasków.

Mechanizm subskrypcji oparłem o wzorzec projektowy Observer. Często w C zapominamy o wzorcach projektowych, ponieważ C nie jest językiem obiektowym. Ale implementacja na wskaźnikach na funkcje sprawdza się równie dobrze. A przy okazji pomaga zapewnić odpowiednią separację modułów.

Kod do obsługi obserwera zrobiłem w sposób ogólny. Dzięki temu można użyć tych samych funkcji do obsługi różnych list subskrybentów:

typedef void (*notify_fun_t)(void *param);

err_code_t subscribe(notify_fun_t nf, notify_fun_t *sublist, int32_t sub_max)
{
    int32_t i = 0;
    err_code_t ret = -1;

    for (i = 0; i < sub_max; i++)
    {
        if (sublist[i] == NULL)
        {
            printf("subscribed on idx: %d\n", i);
            sublist[i] = nf;
            ret = 0;
            break;
        }
    }

    return ret;
}

void notify(notify_fun_t *sublist, int32_t sub_max, void *param)
{
    int32_t i = 0;

    for (i = 0; i < sub_max; i++)
    {
        if (sublist[i] != NULL)
        {
            sublist[i](param);
        }
    }
}

A co tu się właściwie dzieje?

Mamy wskaźnik na funkcję notify_fun_t. W funkcji subscribe dopisujemy taki wskaźnik do listy. Natomiast funkcja notify przechodzi przez listę i wywołuje wszystkie funkcje wskaźniki na funkcje, jakie znajdzie.

Możemy teraz użyć tych funkcji w obsłudze taska X:

#define EVENT_X_TIMEOUT 5

void event_x_callback(void *param);
void add_event_x(void);

#define SUB_X_MAX 5
notify_fun_t subscribed_x[SUB_X_MAX];

void event_x_callback(void *param)
{
    printf("event x\n");
    event_restart(param, EVENT_X_TIMEOUT);
    notify(subscribed_x, SUB_X_MAX, param);
}

void event_x_add(void)
{
    event_add(event_x_callback, EVENT_X_TIMEOUT);
}

void subscribe_to_x(notify_fun_t nf)
{
    subscribe(nf, subscribed_x, SUB_X_MAX);
}

Dla taska X mamy te same elementy co w tasku Y, czyli funkcje event_x_callback i event_x_add. Mamy również obsługę subskrypcji. A więc jest specjalna tablica subscribed_x przechowująca notify_fun_t oraz funkcje do subskrypcji i notyfikacji z odpowiednimi argumentami.

W tym przypadku również wskaźnik na event przechodzi przez wszystkie funkcje jako void *param i może być wykorzystany w funkcjach obsługujących eventy, jeżeli zajdzie taka potrzeba.

Podsumowując – task X wykona swoje zadanie, zresetuje swój event, a następnie wykona funkcję notify dla wszystkich zapisanych subskrybentów. Nie obchodzi go, co dokładnie te funkcje robią.

Task wyzwalany

Task Z jest wyzwalany przez task X. A jeszcze, żeby nie było tak prosto na początku ma delay. Task Z korzysta z interfejsu do subskrypcji udostępnianego przez task X:

#define EVENT_Z_TIMEOUT 2 + 1

void event_z_callback(void *param);
void add_event_z(void);

void event_z_callback(void *param)
{
    printf("event z\n");
    event_clear(param);
    /*
     *  Nie trigerujemy od nowa, zamiast tego czyscimy event.
     *  Po kolejnym wywolaniu z eventu x utworzy sie od nowa.
     */
}

void event_z_notify(void *param)
{
    printf("event z notify called\n");
    event_add(event_z_callback, EVENT_Z_TIMEOUT);
}

void event_z_add(void)
{
    /* Event Z musi wiedziec o istnieniu eventu x, bo od niego zalezy */
    subscribe_to_x(event_z_notify);
}

Mamy podobnie jak poprzednio funkcje event_z_callback i event_z_add. Jednak ich implementacja się trochę różni. Nasz callback tym razem nie restartuje timeoutu, tylko usuwa się z listy. Event jest od nowa dodawany przez funkcję event_z_notify, którą podaliśmy jako argument podczas subskrypcji do tasku x.

Kiedy wywoła się task X, wywoła się też funkcja notify, task Z zostanie dodany, odczekamy timeout i wywołamy callback dla tasku Z. Następnie task Z zostanie usunięty, poczekamy na kolejne wywołanie tasku X i cykl się zacznie od początku.

W definicji timeoutu dla tasku Z widzimy jedną z trudności własnego implementowania mechanizmów RTOSopodobnych.

#define EVENT_Z_TIMEOUT 2 + 1

Dodanie offsetu 1 to brzydki hack, który zastosowałem na szybko, żeby przykład działał. Nowy event jest dodawany do tablicy jako kolejny element. Pętla obsługująca eventy obsługuje go w tej samej iteracji zmniejszając od razu timeout. W dodatku to zadanie nie jest gwarantowane, bo przy innej kolejności eventów w tablicy task Z może zostać zapisany na wcześniejszy indeks i nie być obsłużony w tej iteracji.

Aby rozwiązać ten problem lepiej musielibyśmy dodać mechanizm gwarantujący, że task zostanie dodany dopiero po przejściu całej pętli w funkcji events_process.

Takich miejsc, gdzie możemy się wywalić implementując samodzielnie mechanizmy obsługi i synchronizacji tasków jest sporo. I dlatego właśnie korzystamy z gotowych RTOSów, które są na bieżąco rozwijane i aktualizowane. Dzięki temu możemy skupić się na pisaniu naszej aplikacji, a nie na pisaniu schedulera, który jak już wspomniałem sam w sobie może być cięższym zadaniem, niż sama aplikacja.

W ten sposób udało nam się dobrnąć do końca omawiania tej przykładowej implementacji. Jak wspominałem – nie jest to rozwiązanie produkcyjne. Raczej pokazuje sposób realizacji różnych mechanizmów, które możemy zastosować w swoim kodzie.

Podsumowanie

Pokazana przykładowa implementacja korzysta ze wskaźników na funkcje, listy eventów i wzorca observer. Wskaźniki na funkcje pomagają nam nie tylko obsłużyć eventy, ale również wprowadzić odpowiednią strukturę do naszego kodu. Poszczególne moduły wiedzą tylko o tych elementach systemu, o których muszą. Dzięki trzymaniu się tych zasad możemy pisać łatwiejszy do utrzymania kod.

Jeżeli obsługa eventów nie jest najważniejszym celem naszej aplikacji i jest potrzebna tylko w kilku miejscach – możemy zrezygnować z takiej dbałości o zachowanie abstrakcji. Wtedy funkcje będą wywoływane bezpośrednio, kod zostanie zaśmiecony zależnościami, ale dla pojedynczych funkcji może nie opłaca się implementować całych mechanizmów.

Natomiast sama implementacja jest jednak dosyć skomplikowana. Widzimy więc wyraźnie ile RTOS przed nami ukrywa złożoności. A w końcu on to robi w jeszcze bardziej skomplikowany (ale jednocześnie bezpieczniejszy) sposób. Dlatego kiedy eventów robi się więcej, a zależności między nimi są coraz bardziej skomplikowane – RTOS staje się nieunikniony.

Na koniec jeszcze przypominam o trwającej promocji na kurs „C dla Zaawansowanych”. Promocja trwa jeszcze do poniedziałku do północy. A tematy wskaźników na funkcje, interfejsów, wzorców projektowych, czy warstw abstrakcji są tam omówione.

6 Comments

  1. Odnośnie kodu „Taska X” – wydaje mi się, że za dużo rzeczy jest przerzucone do niego, a powinno być ukryte w mechanizmie „eventów”.

    Teraz task X:
    – ma w sobie tablicę subskrybentów
    – ma w sobie funkcje obsługującą subskrypcję
    – uruchamia notyfikację

    Osobiście zostawiłbym tylko uruchomienie notyfikacji, ale wtedy też potrzebny jest mechanizm, żeby do obserwatora „zarejestrować” „serwer”, który może wysyłać notyfikacje.

    Załóżmy, że taskX to task pomiaru temperatury i rozsyła notyfikację o nowym pomiarze – w tym rozwiązaniu:
    – wiemy kto się zasubskrybował – a nie potrzebujemy tego/nie powinniśmy wiedzieć
    – jeśli dojdą nowe moduły, które chcą pobierać temperaturę w ten sposób – musimy ten moduł też zmodyfikować – zwiększyć tablicę tutaj

    Jeśli w systemie byłoby wiele tasków podobnych do tego X – ta część kodu będzie copy-paste, a mogłaby być wszyta w mechanizm eventów.

    Tak jak napisałeś – to nie jest rozwiązanie produkcyjne, ale zawsze przy pokazywaniu plusów itp warto wspomnieć o minusach i jaki impact na inne moduły/projekt ma użycie takiego mechanizmu.

    • GAndaLF

      24 września 2021 at 11:02

      Co do kodu taska X – w godbolcie w komentarzu za wszystkimi taskami napisałem (chyba ostatecznie zapomniałem tego dodać w artykule), że są tak jakby trzy klasy tasków. I jakbyśmy dalej generalizowali to byśmy zrobili oddzielne pliki z taskiem cyklicznym, cyklicznym z subskrypcją i triggerowanym. A task X miałby tylko swój handler podstawiany pod funkcje printf i własną tablicę subskrybentów.

      Pod mechanizm eventów bym tego nie podpinał. Zrobiłbym to jako niezależne moduły.

      Co do tablicy subskrybentów to specjalnie ją dałem w tasku X, żeby dało się stworzyć drugi task z subskrypcją korzystający z tego samego mechanizmu. Wtedy lista subskrybentów dla każdego jest jego wewnętrzną sprawą. Możemy jeszcze dyskutować, czy lepiej linkowaną listę i trzymać w tasku X tylko wskaźnik przekazywany do funkcji obsługujących. Ale mi zawsze wystarczył observer na tablicach. Poza tym wtedy też trzeba mieć albo jakiś pool albo malloc albo statyczne obiekty w każdym subskrybującym tasku.

      Tutaj jeszcze kolejna decyzja, czy chcemy kontrolować rozmiar tablicy dla każdego tasku, czy używać globalnego. Ale zawsze musimy albo zrobić mechanizm umożliwiający zapis każdemu albo tablicę. Ja wybieram tablice.

      Kolejna sprawa, że część funkcji z tej implementacji będzie statyczna i nie dostanie się do publicznych interfejsów. Funkcję do subskrybcji bym zostawił w interfejsie dla tasku X. Wtedy w tasku Z widzimy od razu, że zależy od taska X. Printfa w funkcji subskrypcji zrobiłem, żeby pokazać mechanizm na konsoli. Tak naprawdę nie będziemy chcieli tam dodatkowego kodu. To jest tylko wrapper wołający funkcję subscribe z odpowiednimi argumentami.

      • O tym pierwszym też miałem wspomnieć – że może trochę za dużo typów jest obsługiwanych i to rozdzielić (choćby interfejsem innym).

        O tym „Co do tablicy subskrybentów to specjalnie ją dałem w tasku X, żeby dało się stworzyć drugi task z subskrypcją korzystający z tego samego mechanizmu.” nie wspomniałeś we wpisie, a w sumie ciekawy motyw.

        Czyli wszytko wynika z jakiś decyzji – i tak ma być 🙂
        Najgorzej jak potem nie są spisane i nikt nie pamięta „po co to było”, albo jakaś nieprzemyślana rzecz (czyli klepnięcie kodu bez decyzji) ma duży wpływ na projekt i ciężko potem to zmienić/rozbudować. A jak ostatnio bawiłem się prostą biblioteką do regulatora to wyszło mi tam kilkanaście decyzji – np. na pytanie „czy coś wyciągnąć czy wcielić w moduł” chyba każda odpowiedź jest dobra – zależy od kontekstu.

        • GAndaLF

          24 września 2021 at 12:31

          No faktycznie po dokonaniu decyzji trudno się o niej rozmawia bo ją bierzemy za pewnik, a rozmówca może tego nie wiedzieć. Słyszałeś o Architecture Design Records?
          https://github.com/joelparkerhenderson/architecture-decision-record

          W niektórych projektach miałem coś takiego jak decision log robiony w wordzie na zasadzie wolnej amerykanki, ale to jest lepsze bo od razu odpowiednią strukturę wszystkiemu na daje. Oczywiście pod warunkiem, że uzupełniamy 😀

  2. A co zrobić gdy, task cykliczny, który np odczytuje port GPIO musi te dane przekazać do innego taska?

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *