Eventhåndtering i .NET med Reactive Extensions (Rx)

Reactive Extensions for .NET (Rx) er et nylig utgitt bibliotek som i mange situasjoner kan gjøre eventhåndtering og asynkron programmering lettere og mer elegant. Både WinForms, WPF og Silverlight er støttet. Det finnes begrenset med offisiell dokumentasjon, men det er skrevet mange entusiastiske blogginnlegg om temaet (se referansene nederst på siden). Formålet med denne artikkelen er å samle litt av denne informasjonen for å gi en innføring i Rx – både praktiske anvendelser og det teoretiske fundamentet som ligger bak. Leseren bør kunne C# og LINQ, og bør kjenne til Iterator pattern.

Den grunnleggende idéen bak Rx er at en .NET-event kan ses på som en kilde som produserer en sekvens av notifikasjoner (beskjeder om at eventen er blitt utløst) – og hvis en event er en sekvens, burde den kunne manipuleres ved hjelp av LINQ! Dette høres kanskje ut som å tøye strikken litt langt, så la oss se på hvilke karakteristikker en slik notifikasjonssekvens har:

  • Den er ikke “forhåndsutfylt”, men blir fylt ut etterhvert som tiden går og det kommer inn notifikasjoner fra eventen.
  • Lengden er potensielt sett ubegrenset.
  • Vi kan ikke uten videre be om å få neste element fra sekvensen; vi må belage oss på å vente på neste notifikasjon og reagere på den når den kommer. I praksis gjøres dette ved å registrere en event handler.

De to første egenskapene deles faktisk med vanlige itererbare sekvenser, som kan lazy loades og/eller gradvis fylles ut fra en treg datakilde (f.eks. en database) – og det er lov å lage iteratorer som ikke egentlig har noen datakilde, men som bare genererer en sekvens med tall som ikke nødvendigvis stopper noen gang. Det er kun den siste egenskapen som er annerledes: kontrollen ligger hos datakilden, ikke hos oss. Vi skal nå studere litt av teorien bak programmeringsmodellen det hintes til her, før vi viser et par meget spennende anvendelser av det.

Sammenhengen mellom Iterator pattern og Observer pattern

(Diskusjonen i denne seksjonen er delvis basert på [2]. Det er ikke nødvendig (bare interessant) å kjenne til den bakenforliggende teorien, så man kan hoppe rett til anvendelsene hvis man foretrekker dét.

C# implementerer Iterator pattern ved hjelp av to interfaces som her presenteres i en noe nedstrippet utgave:

interface IEnumerable<T> {
    IEnumerator<T> GetEnumerator();
}
 
interface IEnumerator<T> {
    T Current { get; } // Might throw exception
    bool MoveNext();
}

(Siden C# ikke har noen throws-setning, har vi markert med en kommentar at kall til Current kan resultere i en exception.) La oss betrakte sekvensdiagrammet for en typisk anvendelse av disse interfacene:

Sekvensdiagram for Iterator pattern i C#

(I praksis er det nok oftest MoveNext() som kontakter datakilden, men det som illustreres her er en mulig måte å gjøre det på.) Hovedpunktene er:

  1. Vi går til datakilden (som er abstrahert som en IEnumerable<T>) og ber den om å få et objekt vi kan bruke til å interagere med datakilden – nemlig en IEnumerator<T>, som da blir vårt abstrakte “innsynspunkt” til datakilden.
  2. Hver gang vi har lyst på et nytt element fra datakilden, går vi til enumeratoren og ber den om å få det neste elementet. Enumeratoren vil så gå til datakilden, hente neste objekt (av typen T) og gi det til oss.

Dette er et eksempel på en interaktiv programmeringsmodell, fordi det er vi som kontrollerer programflyten ved å bestemme når vi ønsker å få et nytt element. Det motsatte av dette er en reaktiv programmeringsmodell, hvor vår kode i utgangspunktet er passiv, og kun foretar seg noe når den får beskjed om at noe har skjedd. En vanlig måte å modellere dette på er Observer pattern, som i .NET-rammeverket lenge har vært indirekte implementert gjennom støtten for events. Rx gir oss en mer generell implementasjon av Observer pattern, gjennom IObservable<T> og IObserver<T>:

interface IObservable<T> {
    IDisposable Subscribe(IObserver<T> observer);
}
 
interface IObserver<T> {
    void OnNext(T value);
    void OnError(Exception exception);
    void OnCompleted();
}

Kort oppsummert brukes disse interfacene slik:

  1. Vi lager en ny observatør (som må implementere IObserver<T>), som skal fungere som datakildens abstrakte innsynspunkt til vår kode, og gir denne til datakilden, som må markere at den kan observeres ved å implementere IObservable<T>. (Merk at “datakilden” ikke trenger å være en tradisjonell liste eller database; det kan for eksempel være en event eller et annet objekt som det gjentatte ganger kan “skje noe interessant” med.)
  2. Hver gang datakilden produserer et nytt element (eller hver gang det skjer ett eller annet interessant som datakilden mener at observatørene bør vite om), sier den fra til observatørene ved å kalle OnNext(), OnCompleted() eller OnError(), som så kaller en metode i vår kode.

La oss betrakte sekvensdiagrammet for dette:

Sekvensdiagram for Observer pattern i C#

Dette ligner veldig på det forrige sekvensdiagrammet, bare at pilene går den andre veien! Det viser seg faktisk at IObservable<T> og IObserver<T> kan ses på som de “motsatte” interfacene av IEnumerable<T> og IEnumerator<T>:

  • IEnumerable<T>.GetEnumerator() tar ingen parametre og returnerer en IEnumerator, mens IObservable<T>.Subscribe() tar inn en IObserver<T> og hadde i utgangspunktet ikke trengt å returnere noe (men pga. et designmessig valg returerer den en IDisposable; dette blir forklart lenger nede).
  • IEnumerator<T>.Current er en property, som kan sees på som en parameterløs metode. Den vil enten returnere en T eller kaste en exception. Det motsatte av dette vil da være være to void-metoder hvor den ene tar inn en T og den andre tar inn en exception: IObserver<T>.OnNext() og IObserver<T>.OnError().
  • IEnumerator<T>.MoveNext() tar ingen parametre og returnerer en bool som forteller om det finnes flere elementer. Det omvendte skulle da i utgangspunktet være en metode som tar inn en bool som forteller om det kommer flere elementer. Men “det kommer flere elementer” er en overflødig opplysning i en reaktiv modell, fordi vi vet at det neste elementet blir servert til oss via OnNext() når det kommer – derfor kan vi bestemme at metoden bare skal kalles når den ellers ville ha blitt kalt med false for å opplyse om at det ikke kommer flere elementer. Det naturlige navnet blir da IObserver<T>.OnCompleted().

Innenfor matematikken kalles dette fenomenet for dualitet, og den interesserte leser / videoseer kan lære mer i [7] og [8].

Bruk av Rx

Da skulle teorien være på plass, så da er det på tide å vise hva vi kan bruke dette til. Tanken er altså at siden observerbare objekter deler mange karakteristikker med itererbare objekter, bør vi kunne skrive spørringer mot dem.

Merk: all koden vi kommer til å vise her er ment for å plasseres inni constructoren til en Form i et konsollprosjekt som også bruker WinForms – lages enklest ved å lage et nytt WinForms-prosjekt og så gå på Project Properties og si at det er en Console Application. Prosjektet vil trenge referanser til System.Reactive og System.CoreEx.

Aller først må vi skaffe oss et observerbart objekt, helst basert på en event. Observable er en statisk hjelpeklasse som blant annet lar oss gjøre følgende:

var mouseMove = Observable.FromEvent<MouseEventArgs>(this, "MouseMove");

Dette gir oss en IObservable<Event<MouseEventArgs>>, som representerer en strøm av notifikasjoner som vil produseres etterhvert som musen beveges. En Event<E>-instans representerer det vi i denne artikkelen har kalt for notifikasjoner – de opplyser om at en event er blitt utløst, og inneholder objektet eventen ble utløst på og EventArgs-objektet (av den spesifikke typen E) som ville ha blitt sendt inn til eventhåndtereren. Siden LINQ behandler en IObservable<T> som en sekvens av T-instanser, kan vi kjøre spørringer mot mouseMove for å trekke ut data fra notifikasjonene som kommer til å komme:

var locations = from mm in mouseMove
                select new { mm.EventArgs.X, mm.EventArgs.Y };

Dette vil gi en ny IObservable, denne gangen med den anonyme typen new { int X, int Y }, som representerer en strøm av x- og y-koordinater. Vi kan nå abonnere på notifikasjoner fra locations ved hjelp av Subscribe(). Dette er en extension method som i utgangspunktet lar oss oppgi en IObserver som vil bli lagt til i det observerbare objektet sin liste over observatører/abonnenter og som dermed vil få beskjed hver gang det skjer noe interessant. Men det ville være strevsomt å implementere IObserver hver gang man skal abonnere på noe, så heldigvis er Subscribe() overloadet til å la oss oppgi et lambdauttrykk for hva vi ønsker at skal skje når det produseres et nytt element (evt. også lambdauttrykk for hva som skal skje når strømmen er slutt eller hvis det oppstår en feil). Dette uttrykket vil automatisk bli pakket inn i en IObserver som så vil bli satt til å abonnere på det observerbare objektet:

locations.Subscribe(pos => Console.WriteLine("{0}, {1}", pos.X, pos.Y));

Nå vil musens x- og y-koordinater bli printet hver gang den beveger seg. Vi kunne også ha abonnert direkte på mouseMove; man ikke benytte LINQ-spørringer til å trekke ut informasjon.

Hvordan kan man “si opp abonnementet”? Det viser seg at Subscribe() returnerer en IDisposable som fungerer slik at Dispose() på denne vil gjennomføre avmeldingen. Dermed kan man enten bruke using for å abonnere i en kort periode, eller man kan ta vare på returverdien fra Subscribe() og melde seg av på et helt annet punkt i programmet (kanskje utløst av en annen event).

Merk: mouseMove og locations i koden ovenfor er altså IObservable-instanser. I utgangspunktet kan en IObservable representere hva som helst, men siden vi (og Rx) benytter det kun til eventhåndtering, kommer vi flere steder til å snakke om IObservable-instanser som “eventstrømmer” – enten de stammer direkte fra en C#-event (slik som mouseMove gjør) eller er avledet av andre IObservable (slik som locations er).

“Extension events”

Som vi ser, er dessverre syntaksen for å få tak i en IObservable for en event ganske strevsom. Som beskrevet i [3] kan man kapsle inn dette i extension methods langs disse linjene:

internal static class ExtensionMethods {
    public static IObservable<IEvent<MouseEventArgs>> GetMouseMove(this Control control) {
        return Observable.FromEvent<MouseEventArgs>(control, "MouseMove");
    }
}

Parametertypen bør være den mest generelle typen som har den aktuelle eventen (dette blir først kontrollert når funksjonen kalles), og pass på at EventArgs-typen er riktig for den aktuelle eventen. Merk at mange blogginnlegg antar at leserne vet at dette er noe som bør gjøres, og benytter f.eks. GetMouseMove() uten eksplisitt å definere den noen plass, slik at man kan få inntrykk av at det er en Rx-definert metode.

Kombinering og timing av events

Det vi har vist hittil ser bare ut som en marginalt mer kompakt syntaks for å abonnere på events og for å velge ut hvilke EventArgs-data man er interessert i. Men den virkelige styrken til Rx ligger i muligheten til å lage nye eventstrømmer ved å kombinere andre strømmer, noe som kan gjøres på mange forskjellige måter.

Cross joins

Som i vanlig LINQ kan man gjøre en cross join. I databasesammenheng vil dette gi kryssproduktet av de to tabellene man slår sammen – det vil si alle mulige kombinasjoner av ett element fra den ene tabellen og ett element fra den andre. En cross join av to IObservable fungerer på en tilsvarende, men litt annerledes måte. Betrakt følgende spørring:

var crossJoin = from a in streamA
                from b in streamB
                select new { a, b };

Dette fungerer som følger:

  • For hver notifikasjon fra streamB vil crossJoin sende ut én notifikasjon for hver notifikasjon som hittil har kommet fra streamA. Hver av de utløste notifikasjonene inneholder informasjon om én notifikasjon fra streamA og én notifikasjon fra streamB. Dette kan ses på som et inkrementelt kryssprodukt hvor det er den innerste eventstrømmen som styrer produksjonen av kryssproduktet, så dette er med litt godvilje i tråd med den vanlige LINQ-oppførselen.
  • Hvis streamB signaliserer at den er ferdig (ved å kalle OnCompleted() på observatørene sine), vil spørringens interne liste over mottatte notifikasjoner fra streamA nullstilles, og spørringen vil re-abonnere på streamB.

Konsekvensen av disse reglene er at cross joins kan brukes til å starte en ny abonnering på streamB hver gang vi får en notifikasjon fra streamA – og at hver enkelt abonnering vil avsluttes når man når slutten på streamB. Den enkleste måten å forstå dette på vil sannsynligvis være å betrakte følgende kodeeksempel (forutsetter at passende extension methods er definert). Until() blir forklart nærmere i neste avsnitt, men sørger kort fortalt for at én eventstrøm kan terminere en annen.

var mouseDrag = from mouseDown in this.GetMouseDown()
                from mouseMove in this.GetMouseMove().Until(this.GetMouseUp())
                select mouseMove.EventArgs.Location;
mouseDrag.Subscribe(loc => Console.WriteLine(loc));

Tolkningen av dette er: “hver gang det kommer en MouseDown skal vi begynne å lytte etter MouseMove frem til det kommer en MouseUp”. Hvis vi heller bruker en lambdafunksjon som f.eks. fargelegger et punkt på Form‘en, har vi dermed fått et lite tegneprogram. (Den spesifikke funksjonaliteten som beskrives her er selvfølgelig triviell å oppnå med vanlig eventhåndtering også; formålet var bare å gi en enkel illustrasjon av cross joins – mer avanserte anvendelser kommer lenger ned.)

Merging og timing

Her viser vi noen flere hendige funksjoner som kan brukes for å kombinere eventstrømmer. Det forutsettes at a og b er forhåndsdefinerte IObservable-instanser. Hvert av uttrykkene returnerer en ny IObservable med den beskrevne oppførselen.

  • a.Merge(b) gir en ny strøm som fletter sammen elementer fra a og b i den rekkefølgen de dukker opp. Hvis det kommer flere notifikasjoner fra den ene strømmen uten at det kommer noe fra den andre, vil de sendes videre uten å vente på den andre strømmen.
  • a.Zip(b, func) tar inn en to-parameter-lambdafunksjon som brukes til å produsere nye elementer basert på notifikasjoner fra a og b. Hver gang det kommer en notifikasjon fra den ene strømmen, må Zip() nødvendigvis vente til det kommer en notifikasjon fra den andre strømmen, så man har noe å kombinere med.
  • a.Until(b) spesifiserer at man bare er interessert i notifikasjoner fra a inntil det kommer en notifikasjon fra b. Med en gang b produserer noe, termineres strømmen ved at OnCompleted() kalles på alle som observerer a.Until(b). Dette gjør det enkelt å kansellere abonneringer når visse hendelser inntreffer.
  • a.WaitUntil(b) spesifiserer at man er interessert i alle notifikasjoner fra a, men at man først vil vite om dem når det kommer en notifikasjon fra b. Med en gang det kommer noe fra b, vil alt som hittil er mottatt fra a sendes videre, og deretter oppfører det seg som en vanlig abonnering på a.
  • a.Delay(t) venter i t millisekunder med å videresende notifikasjoner fra a.
  • Observable.Return(x) lager en strøm med bare ett element (og som vil termineres etter at det elementet er lest), nemlig x.

Her ser vi én av mange kreative måter å kombinere noen av disse funksjonene på:

var delayedAUnlessB =
    from a in eventA
    from b in Observable.Return(new object())
                        .Delay(1000)
                        .Until(eventB)
    select a;

Dette vil gi følgende effekt: hver gang eventA utløses, vil vi få vite om den ett sekund senere – med mindre eventB utløses i mellomtiden. Det er lov å la eventA og eventB være den samme eventstrømmen; effekten blir da at hver gang det kommer en rekke av eventA-notifikasjoner hvor hver notifikasjon kommer innen ett sekund etter den forrige, får man bare vite om den siste i rekken. Hvis dette høres søkt ut, se neste seksjon for en stilig anvendelse av det.

Asynkron programmering

Asynkron programmering ved hjelp av IAsyncResult er støttet av enkelte klasser i .NET som brukes til tidkrevende operasjoner, f.eks. WebRequest. Formålet er å hindre at kodeutførelsen blokkeres, ved at den asynkrone funksjonen fyrer av en bakgrunnstråd som utløser en event når resultatet er ferdig. Men som [1] påpeker, kan feilhåndtering være kronglete, og hvis man bruker lambdafunksjoner for å få koden fin og kompakt, er det ikke mulig å avslutte abonneringen hvis man ønsker det (eventhåndterere som ikke brukes lenger, men som ikke blir avregistrert, er en av måtene å lage minnelekkasjer på i C#). I tillegg er det noe strevsomt å bygge inn støtte for asynkron programmering i sin egen kode. Rx gjør dette mye enklere og løser også de tidligere nevnte problemene. Observable.ToAsync() (som sannsynligvis har den største overload-listen i hele .NET) lager en asynkron utgave av en funksjon. Når den resulterende funksjonen kalles, får man en IObservable som kommer til å produsere ett element – nemlig returverdien til funksjonen (som dukker opp idet funksjonen blir ferdig) – og så terminere. ToAsync kan ta inntil 16 typeparametre: det siste er returtypen; alle de foregående er parametertypene. (Hvis man har behov for å lage en asynkron utgave av en funksjon som tar 16 parametre, er sannsynligvis tiden inne for en liten refaktorering…) Slik kan vi f.eks. lage en asynkron kvadratrotutregning:

var sqrt = Observable.ToAsync((number) => Math.Sqrt(number));

sqrt kan vi enten ta vare på for å eksplisitt be om resultatet når vi trenger det på et senere tidspunkt (ved å skrive double s = sqrt(42).First(); – dette vil blokkere inntil funksjonen er ferdig), eller vi kan registrere en lambdafunksjon for å håndtere resultatet når det dukker opp (og evt. en lambdafunksjon for å håndtere feil som måtte oppstå):

sqrt(42).Subscribe(answer => Console.WriteLine("The square root is {0}", answer));

I tillegg kan asynkrone funksjoner benyttes i LINQ-spørringer, som vi skal se i eksempelet nedenfor. For øvrig må man være bevisst på at når man lytter på en asynkron funksjon, vil notifikasjonen komme fra en annen tråd – så man må tenke på eventuelle race conditions som kan oppstå. Dessuten fungerer dette dårlig i en WinForms-applikasjon hvis man har tenkt å reagere på notifikasjonen ved å manipulere GUI’et, da WinForms har en dedikert GUI-tråd (hovedtråden til applikasjonen), som er den eneste tråden som har lov til å røre GUI’et. Hvis koden som abonnerer på den asynkrone funksjonen skal gjøre noe som helst med GUI’et, må man kalle s.SubscribeOnWindowsForms(this).Subscribe(...), hvor this refererer til den Form‘en hvis tråd man ønsker å benytte.

Eksempel: Autocompletion

Inspirert av [5] skal vi nå vise hvordan vi kan lage funksjonalitet som etterligner autocompletion-funksjonaliteten i f.eks. google sin søkeboks – etterhvert som man skriver, vil man få opp forslag til fullføringer av det man har skrevet inn. Vi tenker oss at det eksisterer en klasse AutocompletionWebService med en metode string[] GetSuggestions(string prefix) som bruker noen sekunder på å returnere. Hvis vi kaller denne metoden direkte under håndtering av en event som har sin opprinnelse i en GUI-event, vil GUI’et henge. Hovedvinduet til programmet inneholder en TextBox som heter SearchBox og en ListBox som heter ResultList. Kravspekken lyder som følger:

“Ett sekund etter siste tekstendring i tekstboksen skal søket startes i bakgrunnen. Når resultatet kommer tilbake skal listeboksen populeres med forslagene, men bare hvis teksten i tekstboksen ikke er blitt endret i mellomtiden.”

Dette kan vi løse slik i WinForms (all koden skal i Form-subklassen sin constructor eller OnLoad()):

  1. Den sentrale eventen her er TextBox.TextChanged, så vi må pakke den inn i en IObservable. Forutsatt at en passende extension event er definert, kan vi gjøre det slik:
    var textChanged = this.SearchBox.GetTextChanged();

    Hvis ikke, må vi skrive litt mer kode:

    var textChanged = Observable.FromEvent<EventArgs>(this.SearchBox, "TextChanged");
  2. Web service-kallet må pakkes inn i en asynkron funksjon:
    var webService = new AutocompletionWebService();
    var asyncSearch = Observable.ToAsync(text => webService.GetSuggestions(text));
  3. Vi må definere at vi kun er interessert i tekstendringer etter ett sekund med inaktivitet, og at for hver slik tekstendring skal web servicen kalles. I tillegg er vi bare interessert i resultatet fra web servicen dersom ikke teksten er blitt endret i mellomtiden.
    var searchResult =
        from tc in textChanged
        let text = this.SearchBox.Text
        from waitHere in Observable.Return(new Unit())
                                   .Delay(1000)
                                   .Until(textChanged)
        from results in asyncSearch(text).Until(textChanged)
        select new { text, results };
  4. Til sist må vi abonnere på evenstrømmen vi har laget, og sørge for at vi mottar notifikasjonene på GUI-tråden:
    searchResult.SubscribeOnWindowsForms(this).Subscribe(t => {
        this.ResultList.Items.Clear();
        this.ResultList.Items.AddRange(t.results);
    });

Voilà! Det samme vil selvfølgelig være mulig å gjøre uten Rx, men det vil kreve en intrikat kombinasjon av timere og tilstandsvariabler inni event handlers – som det ikke vil være særlig trivielt å teste. Med Rx får vi uttrykt det samme mye mer konsist, og med mindre risiko for å gjøre noe feil. I denne sammenhengen kan det også nevnes at Rx allerede blir brukt internt i Microsoft sin kode – i enhetstestene til Silverlight.

Da er det bare å slippe kreativiteten løs; hvis noen har eksempler på spennende ting de har gjort i Rx, må man gjerne poste det i en kommentar!

Referanser

  1. Introduksjon til Rx og asynkron programmering
  2. Forklaring av dualiteten mellom Iterator pattern og Observer pattern
  3. “Extension events” og diskusjon om hvordan cross joins fungerer
  4. Drag-and-drop med Rx
  5. Inkrementelt søk med asynkron programmering i Rx
  6. Foredrag om IObservable/IObserver
  7. Foredrag om Rx
  8. Foredrag om Rx
  9. Bindable LINQ (et rammeverk basert på mye av den samme tankegangen som Rx)
  • http://blog.goeran.no Gøran Hansen

    Hei Åsmund

    Jeg synes dette var en flott post! Du er flink til å formidle budskapet gjennom et godt språk og gode eksempler:)

    Reactive Programming er et spennende konsept, og med et godt rammeverk eller innebygd støtte i programmeringsspråket kan det være med på å forenkle hverdagen til oss programmerere. Jeg jobber mye med brukergrensesnitt kode, og er pga. det eksponert for asynkron programmering og tråding. I dag må jeg alltid skrive små rammeverk for å forenkle koden – slik at den blir mer lesbar og testbar. Takket være din post skal jeg undersøke Rx rammeverket nærmere.

    Ulempen med rammeverk som forenkler komplekse problemstillinger gjennom abstraksjoner, er at det kan føre til at uerfarne programmerere ikke forstår hva som skjer ”under-the-hood”. Noe som igjen fører til dårlige løsninger…

    Et bruksområde jeg umiddelbart ser for Rx er til testing. Ofte når jeg skriver GUI kode jobber jeg med en modell som er ”observable” (MVC, MVP, MVVM/PresentationModel). Det er mange ganger jeg har behov for å skrive ”recordere” for hånd, slik at jeg kan spore events som blir fyrt. Dette kan jeg f.eks bruke som datagrunnlag for å validere at koden har kjørt som forventet.
    Kanskje jeg til og med skriver en e-post hvis jeg finner ut noe interessant!

    Gøran

  • http://twitter.com/virtueme Benny Thomas

    Flott post, du har funnet essensen i RX, jeg synes eksemplene fra RX gjengen fokuserer veldig mye på eventer fra musbevegelser og er ikke direkte anvendelig for vanlige Enterprise programmerer.

    Jeg har selv brukt det i ett scenario der vi søker på ulike kilder (5 forskjellige webservicer). Det fine var at vi slapp å skrive synkroniseringshåndtering eller threadhåndtering. Koden ble som følger:

    public class SearchEngine : ISearchEngine
    {
    private readonly List engines;

    public SearchEngine()
    {
    engines = new List();
    }

    public void Add(ISøkRepository søkRepository)
    {
    engines.Add(søkRepository);
    }

    public int Count
    {
    get { return engines.Count; }
    }

    public ObservableCollection Søk(SøkParametere parametere)
    {
    var liste = new ObservableCollection();

    var alle = engines
    .Where(x => parametere.Type == null || x.Type.Equals(parametere.Type))
    .Select(
    x => x.Søk(parametere.Kode, parametere.Navn)
    .Where(parametere.HarBruker)
    .Where(parametere.HarSistEndret)
    .Where(parametere.HarStatus).ToObservable());

    var merge = alle.Merge();

    merge.Subscribe(x => liste.Add(x));

    return liste;
    }
    }

    Dette funker fint, det eneste problemet jeg har er at jeg må vente ett sekund i testen min før jeg kan kjøre en assert på innholdet i listen.

    Benny/VirtueMe

  • http://ox.no Håvard Stranden

    Hei, Åsmund!

    Veldig bra post! Jeg ser Rx fra et mer generelt perspektiv – egentlig er dette generatorer, og Rx er en uttrykksfull måte å bruke generatorer på for en del brukstilfeller. Det hadde vært veldig interessant å finne flere brukstilfeller for mer generelle konsepter (korutiner og continuations) for å se hvor uttrykksfullt et OO-pattern i kombinasjon med disse blir i en del brukstilfeller på .NET-plattformen, og sammenligne med implementasjoner av de samme brukstilfellene i språk som har generatorer, korutiner og continuations som first-class citizens. Kanskje er det på tide å leke med et Continuation Passing Style-bibliotek for .NET som ser ut noe som Rx? Food for thought.

  • http://jonas.follesoe.no Jonas Follesø

    Super post om et spennende tema.

    Er prinsipielt enig med Gørans kommentar om rammeverk som forsøker å forenkle komplekse problemstillinger. Men i dette tilfellet har jeg ikke denne bekymringen. I dag har .NET to designmønster for å eksponere asynkrone API, enten gjennom BeginXXX og EndXXX metoder, eller via Events (http://msdn.microsoft.com/en-us/library/ms228969.aspx).

    Særlig BeginXXX og EndXXX designmønsteret er tungvindt å jobbe med, og noe jeg tror mange utviklere sliter med å forstå. I framtiden tror (og håper) jeg at Rx blir standard for nye asynkrone API, og at gamle API får Rx wrappere. Kode som bruker det Event-baserte designmønsteret for asynkron programmering vil lettere kunne konsumeres via Rx gjennom extension metoder. Microsoft har ikke sagt noe offisielt om Rx blir anbefalt modell for nye asynkrone API.

    Sjekk forøvrig denne posten http://themechanicalbride.blogspot.com/2009/07/developing-with-rx-part-2-converting.html for oversikt over noen av utfordringene med dagens asynkrone designmønster i .NET.

    Dette betyr jo at Rx egner seg til langt mer enn GUI programmering og tester, og (forhåpentligvis) vil bli et langt mer fundamentalt konsept i .NET.

    - Jonas

  • Aasmund Eldhuset

    Gøran: Mange takk! Ja, jeg synes at de har truffet spikeren på hodet her når det gjelder å finne et problem som folk gang på gang skriver egne minirammeverk for, og å så lage et “offisielt” rammeverk for det. Du har utvilsomt rett angående faren med rammeverk; jeg har selv brent meg flere ganger på ikke å forstå rammeverk jeg bruker (f.eks. Hibernate) godt nok… Heldigvis er ikke Rx så veldig stort i omfang, så det burde stort sett gå greit så lenge man forstår kjerneidéen – og dét er grunnen til at jeg brukte så mye tekst på teorien.

    Jeg tror også at enhetstesting av GUI kan understøttes av Rx, og selv om det nok kommer til å forbli en kronglete oppgave, vil det være deilig å slippe å skrive så mye boilerplate-kode nettopp for sporing av event-avfyring. Ulempen med Rx er sånn sett at folk kan komme til å skrive enda mer kompliserte GUI’er som det blir enda vanskeligere å teste… :) (Men hvis man bruker Rx riktig, vil det ihvertfall komme tydeligere frem hva som er intensjonen med koden, enn hvis man har en intrikat kombinasjon av tilstandshåndtering og timere inni massevis av event handlers.)

  • Aasmund Eldhuset

    Benny: Takk! Interessant eksempel du kom med. Tolker jeg det rett som at ISøkRepository.Søk() er synkron, slik at alle søkene kjøres sekvensielt og at listen først fylles ut når alle søkene er ferdige? Stilig bruk av ToObservable() og Merge(), i så fall. Alternativet, som du sikkert har funnet ut for lenge siden, er jo å bruke ToAsync() på selve søkeoperasjonen, men du er kanskje i en situasjon hvor man ikke vil tjene noe særlig på å kjøre søkene parallelt?

    Hvis du mener at du kjører Thread.Sleep() i testen din, tror jeg du kan unngå det f.eks. slik:

    AutoResetEvent waitHandle = new AutoResetEvent(false);
    // Kode her for å lage en SearchEngine og kalle Søk()
    result.Subscribe(x => {
    list.Add(x));
    if (/* listen har ønsket størrelse */)
    waitHandle.Set();
    });
    waitHandle.WaitOne();

    Mer kode, men da slipper man ihvertfall å måtte gjette hvor lenge man bør vente, og risikere at ventetiden er for kort eller kaste bort tid fordi den er for lang. Lurer på om det finnes en mer elegant metode å spesifisere at man vil vente inntil en event er mottatt (trolig ikke, for det er slik kodemalene for Windows Workflow Foundation gjør, for der har Main() behov for å vente på en event…)

  • Aasmund Eldhuset

    Håvard: Takk skal du ha! Ja, dette er jo ett av mange eksempler på stilige funksjonsorienterte konsepter som er på vei inn i C# og/eller .NET, og som lar oss skrive mer uttrykksfull kode. For dem som synes at C# fortsatt ikke er funksjonelt nok, finnes jo F#, som jeg regner med at du kjenner til. (Til dem som ikke har vært borti F# før: det er et funksjons- og objektorientert språk som er bygd på toppen av .NET, så hele .NET-API’et er tilgjengelig – så F# står til .NET som Scala står til JVM.) Disse artiklene om continuations i C# og F# ser interessante ut: http://www.markhneedham.com/blog/2009/06/22/f-continuation-passing-style/ og http://weblogs.asp.net/podwysocki/archive/2008/08/13/recursing-on-recursion-continuation-passing.aspx .

    Jeg må innrømme at jeg ikke kan fullt så mye funksjonsorientert programmering som jeg skulle ønske – på tide at jeg setter meg ned og leser Structure and Interpretation of Computer Programs…