Reactive Extensions for .NET (Rx) er et nylig utgitt bibliotek som i mange situasjoner kan gjøre eventhåndtering og asynkron programmering lettere og mer elegant. Både WinForms, WPF og Silverlight er støttet. Det finnes begrenset med offisiell dokumentasjon, men det er skrevet mange entusiastiske blogginnlegg om temaet (se referansene nederst på siden). Formålet med denne artikkelen er å samle litt av denne informasjonen for å gi en innføring i Rx – både praktiske anvendelser og det teoretiske fundamentet som ligger bak. Leseren bør kunne C# og LINQ, og bør kjenne til Iterator pattern.
Den grunnleggende idéen bak Rx er at en .NET-event kan ses på som en kilde som produserer en sekvens av notifikasjoner (beskjeder om at eventen er blitt utløst) – og hvis en event er en sekvens, burde den kunne manipuleres ved hjelp av LINQ! Dette høres kanskje ut som å tøye strikken litt langt, så la oss se på hvilke karakteristikker en slik notifikasjonssekvens har:
- Den er ikke “forhåndsutfylt”, men blir fylt ut etterhvert som tiden går og det kommer inn notifikasjoner fra eventen.
- Lengden er potensielt sett ubegrenset.
- Vi kan ikke uten videre be om å få neste element fra sekvensen; vi må belage oss på å vente på neste notifikasjon og reagere på den når den kommer. I praksis gjøres dette ved å registrere en event handler.
De to første egenskapene deles faktisk med vanlige itererbare sekvenser, som kan lazy loades og/eller gradvis fylles ut fra en treg datakilde (f.eks. en database) – og det er lov å lage iteratorer som ikke egentlig har noen datakilde, men som bare genererer en sekvens med tall som ikke nødvendigvis stopper noen gang. Det er kun den siste egenskapen som er annerledes: kontrollen ligger hos datakilden, ikke hos oss. Vi skal nå studere litt av teorien bak programmeringsmodellen det hintes til her, før vi viser et par meget spennende anvendelser av det.
Sammenhengen mellom Iterator pattern og Observer pattern
(Diskusjonen i denne seksjonen er delvis basert på [2]. Det er ikke nødvendig (bare interessant) å kjenne til den bakenforliggende teorien, så man kan hoppe rett til anvendelsene hvis man foretrekker dét.
C# implementerer Iterator pattern ved hjelp av to interfaces som her presenteres i en noe nedstrippet utgave:
interface IEnumerable<T> { IEnumerator<T> GetEnumerator(); } interface IEnumerator<T> { T Current { get; } // Might throw exception bool MoveNext(); }
(Siden C# ikke har noen throws-setning, har vi markert med en kommentar at kall til Current kan resultere i en exception.) La oss betrakte sekvensdiagrammet for en typisk anvendelse av disse interfacene:

(I praksis er det nok oftest MoveNext() som kontakter datakilden, men det som illustreres her er en mulig måte å gjøre det på.) Hovedpunktene er:
- Vi går til datakilden (som er abstrahert som en
IEnumerable<T>) og ber den om å få et objekt vi kan bruke til å interagere med datakilden – nemlig enIEnumerator<T>, som da blir vårt abstrakte “innsynspunkt” til datakilden. - Hver gang vi har lyst på et nytt element fra datakilden, går vi til enumeratoren og ber den om å få det neste elementet. Enumeratoren vil så gå til datakilden, hente neste objekt (av typen
T) og gi det til oss.
Dette er et eksempel på en interaktiv programmeringsmodell, fordi det er vi som kontrollerer programflyten ved å bestemme når vi ønsker å få et nytt element. Det motsatte av dette er en reaktiv programmeringsmodell, hvor vår kode i utgangspunktet er passiv, og kun foretar seg noe når den får beskjed om at noe har skjedd. En vanlig måte å modellere dette på er Observer pattern, som i .NET-rammeverket lenge har vært indirekte implementert gjennom støtten for events. Rx gir oss en mer generell implementasjon av Observer pattern, gjennom IObservable<T> og IObserver<T>:
interface IObservable<T> { IDisposable Subscribe(IObserver<T> observer); } interface IObserver<T> { void OnNext(T value); void OnError(Exception exception); void OnCompleted(); }
Kort oppsummert brukes disse interfacene slik:
- Vi lager en ny observatør (som må implementere
IObserver<T>), som skal fungere som datakildens abstrakte innsynspunkt til vår kode, og gir denne til datakilden, som må markere at den kan observeres ved å implementereIObservable<T>. (Merk at “datakilden” ikke trenger å være en tradisjonell liste eller database; det kan for eksempel være en event eller et annet objekt som det gjentatte ganger kan “skje noe interessant” med.) - Hver gang datakilden produserer et nytt element (eller hver gang det skjer ett eller annet interessant som datakilden mener at observatørene bør vite om), sier den fra til observatørene ved å kalle
OnNext(),OnCompleted()ellerOnError(), som så kaller en metode i vår kode.
La oss betrakte sekvensdiagrammet for dette:

Dette ligner veldig på det forrige sekvensdiagrammet, bare at pilene går den andre veien! Det viser seg faktisk at IObservable<T> og IObserver<T> kan ses på som de “motsatte” interfacene av IEnumerable<T> og IEnumerator<T>:
IEnumerable<T>.GetEnumerator()tar ingen parametre og returnerer enIEnumerator, mensIObservable<T>.Subscribe()tar inn enIObserver<T>og hadde i utgangspunktet ikke trengt å returnere noe (men pga. et designmessig valg returerer den enIDisposable; dette blir forklart lenger nede).IEnumerator<T>.Currenter en property, som kan sees på som en parameterløs metode. Den vil enten returnere enTeller kaste en exception. Det motsatte av dette vil da være være tovoid-metoder hvor den ene tar inn enTog den andre tar inn en exception:IObserver<T>.OnNext()ogIObserver<T>.OnError().IEnumerator<T>.MoveNext()tar ingen parametre og returnerer enboolsom forteller om det finnes flere elementer. Det omvendte skulle da i utgangspunktet være en metode som tar inn enboolsom forteller om det kommer flere elementer. Men “det kommer flere elementer” er en overflødig opplysning i en reaktiv modell, fordi vi vet at det neste elementet blir servert til oss viaOnNext()når det kommer – derfor kan vi bestemme at metoden bare skal kalles når den ellers ville ha blitt kalt medfalsefor å opplyse om at det ikke kommer flere elementer. Det naturlige navnet blir daIObserver<T>.OnCompleted().
Innenfor matematikken kalles dette fenomenet for dualitet, og den interesserte leser / videoseer kan lære mer i [7] og [8].
Bruk av Rx
Da skulle teorien være på plass, så da er det på tide å vise hva vi kan bruke dette til. Tanken er altså at siden observerbare objekter deler mange karakteristikker med itererbare objekter, bør vi kunne skrive spørringer mot dem.
Merk: all koden vi kommer til å vise her er ment for å plasseres inni constructoren til en Form i et konsollprosjekt som også bruker WinForms – lages enklest ved å lage et nytt WinForms-prosjekt og så gå på Project Properties og si at det er en Console Application. Prosjektet vil trenge referanser til System.Reactive og System.CoreEx.
Aller først må vi skaffe oss et observerbart objekt, helst basert på en event. Observable er en statisk hjelpeklasse som blant annet lar oss gjøre følgende:
var mouseMove = Observable.FromEvent<MouseEventArgs>(this, "MouseMove");
Dette gir oss en IObservable<Event<MouseEventArgs>>, som representerer en strøm av notifikasjoner som vil produseres etterhvert som musen beveges. En Event<E>-instans representerer det vi i denne artikkelen har kalt for notifikasjoner – de opplyser om at en event er blitt utløst, og inneholder objektet eventen ble utløst på og EventArgs-objektet (av den spesifikke typen E) som ville ha blitt sendt inn til eventhåndtereren. Siden LINQ behandler en IObservable<T> som en sekvens av T-instanser, kan vi kjøre spørringer mot mouseMove for å trekke ut data fra notifikasjonene som kommer til å komme:
var locations = from mm in mouseMove select new { mm.EventArgs.X, mm.EventArgs.Y };
Dette vil gi en ny IObservable, denne gangen med den anonyme typen new { int X, int Y }, som representerer en strøm av x- og y-koordinater. Vi kan nå abonnere på notifikasjoner fra locations ved hjelp av Subscribe(). Dette er en extension method som i utgangspunktet lar oss oppgi en IObserver som vil bli lagt til i det observerbare objektet sin liste over observatører/abonnenter og som dermed vil få beskjed hver gang det skjer noe interessant. Men det ville være strevsomt å implementere IObserver hver gang man skal abonnere på noe, så heldigvis er Subscribe() overloadet til å la oss oppgi et lambdauttrykk for hva vi ønsker at skal skje når det produseres et nytt element (evt. også lambdauttrykk for hva som skal skje når strømmen er slutt eller hvis det oppstår en feil). Dette uttrykket vil automatisk bli pakket inn i en IObserver som så vil bli satt til å abonnere på det observerbare objektet:
locations.Subscribe(pos => Console.WriteLine("{0}, {1}", pos.X, pos.Y));
Nå vil musens x- og y-koordinater bli printet hver gang den beveger seg. Vi kunne også ha abonnert direkte på mouseMove; man må ikke benytte LINQ-spørringer til å trekke ut informasjon.
Hvordan kan man “si opp abonnementet”? Det viser seg at Subscribe() returnerer en IDisposable som fungerer slik at Dispose() på denne vil gjennomføre avmeldingen. Dermed kan man enten bruke using for å abonnere i en kort periode, eller man kan ta vare på returverdien fra Subscribe() og melde seg av på et helt annet punkt i programmet (kanskje utløst av en annen event).
Merk: mouseMove og locations i koden ovenfor er altså IObservable-instanser. I utgangspunktet kan en IObservable representere hva som helst, men siden vi (og Rx) benytter det kun til eventhåndtering, kommer vi flere steder til å snakke om IObservable-instanser som “eventstrømmer” – enten de stammer direkte fra en C#-event (slik som mouseMove gjør) eller er avledet av andre IObservable (slik som locations er).
“Extension events”
Som vi ser, er dessverre syntaksen for å få tak i en IObservable for en event ganske strevsom. Som beskrevet i [3] kan man kapsle inn dette i extension methods langs disse linjene:
internal static class ExtensionMethods { public static IObservable<IEvent<MouseEventArgs>> GetMouseMove(this Control control) { return Observable.FromEvent<MouseEventArgs>(control, "MouseMove"); } }
Parametertypen bør være den mest generelle typen som har den aktuelle eventen (dette blir først kontrollert når funksjonen kalles), og pass på at EventArgs-typen er riktig for den aktuelle eventen. Merk at mange blogginnlegg antar at leserne vet at dette er noe som bør gjøres, og benytter f.eks. GetMouseMove() uten eksplisitt å definere den noen plass, slik at man kan få inntrykk av at det er en Rx-definert metode.
Kombinering og timing av events
Det vi har vist hittil ser bare ut som en marginalt mer kompakt syntaks for å abonnere på events og for å velge ut hvilke EventArgs-data man er interessert i. Men den virkelige styrken til Rx ligger i muligheten til å lage nye eventstrømmer ved å kombinere andre strømmer, noe som kan gjøres på mange forskjellige måter.
Cross joins
Som i vanlig LINQ kan man gjøre en cross join. I databasesammenheng vil dette gi kryssproduktet av de to tabellene man slår sammen – det vil si alle mulige kombinasjoner av ett element fra den ene tabellen og ett element fra den andre. En cross join av to IObservable fungerer på en tilsvarende, men litt annerledes måte. Betrakt følgende spørring:
var crossJoin = from a in streamA from b in streamB select new { a, b };
Dette fungerer som følger:
- For hver notifikasjon fra
streamBvilcrossJoinsende ut én notifikasjon for hver notifikasjon som hittil har kommet frastreamA. Hver av de utløste notifikasjonene inneholder informasjon om én notifikasjon frastreamAog én notifikasjon frastreamB. Dette kan ses på som et inkrementelt kryssprodukt hvor det er den innerste eventstrømmen som styrer produksjonen av kryssproduktet, så dette er med litt godvilje i tråd med den vanlige LINQ-oppførselen. - Hvis
streamBsignaliserer at den er ferdig (ved å kalleOnCompleted()på observatørene sine), vil spørringens interne liste over mottatte notifikasjoner frastreamAnullstilles, og spørringen vil re-abonnere påstreamB.
Konsekvensen av disse reglene er at cross joins kan brukes til å starte en ny abonnering på streamB hver gang vi får en notifikasjon fra streamA – og at hver enkelt abonnering vil avsluttes når man når slutten på streamB. Den enkleste måten å forstå dette på vil sannsynligvis være å betrakte følgende kodeeksempel (forutsetter at passende extension methods er definert). Until() blir forklart nærmere i neste avsnitt, men sørger kort fortalt for at én eventstrøm kan terminere en annen.
var mouseDrag = from mouseDown in this.GetMouseDown() from mouseMove in this.GetMouseMove().Until(this.GetMouseUp()) select mouseMove.EventArgs.Location; mouseDrag.Subscribe(loc => Console.WriteLine(loc));
Tolkningen av dette er: “hver gang det kommer en MouseDown skal vi begynne å lytte etter MouseMove frem til det kommer en MouseUp”. Hvis vi heller bruker en lambdafunksjon som f.eks. fargelegger et punkt på Form‘en, har vi dermed fått et lite tegneprogram. (Den spesifikke funksjonaliteten som beskrives her er selvfølgelig triviell å oppnå med vanlig eventhåndtering også; formålet var bare å gi en enkel illustrasjon av cross joins – mer avanserte anvendelser kommer lenger ned.)
Merging og timing
Her viser vi noen flere hendige funksjoner som kan brukes for å kombinere eventstrømmer. Det forutsettes at a og b er forhåndsdefinerte IObservable-instanser. Hvert av uttrykkene returnerer en ny IObservable med den beskrevne oppførselen.
a.Merge(b)gir en ny strøm som fletter sammen elementer fraaogbi den rekkefølgen de dukker opp. Hvis det kommer flere notifikasjoner fra den ene strømmen uten at det kommer noe fra den andre, vil de sendes videre uten å vente på den andre strømmen.a.Zip(b, func)tar inn en to-parameter-lambdafunksjon som brukes til å produsere nye elementer basert på notifikasjoner fraaogb. Hver gang det kommer en notifikasjon fra den ene strømmen, måZip()nødvendigvis vente til det kommer en notifikasjon fra den andre strømmen, så man har noe å kombinere med.a.Until(b)spesifiserer at man bare er interessert i notifikasjoner fraainntil det kommer en notifikasjon frab. Med en gangbproduserer noe, termineres strømmen ved atOnCompleted()kalles på alle som observerera.Until(b). Dette gjør det enkelt å kansellere abonneringer når visse hendelser inntreffer.a.WaitUntil(b)spesifiserer at man er interessert i alle notifikasjoner fraa, men at man først vil vite om dem når det kommer en notifikasjon frab. Med en gang det kommer noe frab, vil alt som hittil er mottatt fraasendes videre, og deretter oppfører det seg som en vanlig abonnering påa.a.Delay(t)venter itmillisekunder med å videresende notifikasjoner fraa.Observable.Return(x)lager en strøm med bare ett element (og som vil termineres etter at det elementet er lest), nemligx.

Her ser vi én av mange kreative måter å kombinere noen av disse funksjonene på:
var delayedAUnlessB = from a in eventA from b in Observable.Return(new object()) .Delay(1000) .Until(eventB) select a;
Dette vil gi følgende effekt: hver gang eventA utløses, vil vi få vite om den ett sekund senere – med mindre eventB utløses i mellomtiden. Det er lov å la eventA og eventB være den samme eventstrømmen; effekten blir da at hver gang det kommer en rekke av eventA-notifikasjoner hvor hver notifikasjon kommer innen ett sekund etter den forrige, får man bare vite om den siste i rekken. Hvis dette høres søkt ut, se neste seksjon for en stilig anvendelse av det.
Asynkron programmering
Asynkron programmering ved hjelp av IAsyncResult er støttet av enkelte klasser i .NET som brukes til tidkrevende operasjoner, f.eks. WebRequest. Formålet er å hindre at kodeutførelsen blokkeres, ved at den asynkrone funksjonen fyrer av en bakgrunnstråd som utløser en event når resultatet er ferdig. Men som [1] påpeker, kan feilhåndtering være kronglete, og hvis man bruker lambdafunksjoner for å få koden fin og kompakt, er det ikke mulig å avslutte abonneringen hvis man ønsker det (eventhåndterere som ikke brukes lenger, men som ikke blir avregistrert, er en av måtene å lage minnelekkasjer på i C#). I tillegg er det noe strevsomt å bygge inn støtte for asynkron programmering i sin egen kode. Rx gjør dette mye enklere og løser også de tidligere nevnte problemene. Observable.ToAsync() (som sannsynligvis har den største overload-listen i hele .NET) lager en asynkron utgave av en funksjon. Når den resulterende funksjonen kalles, får man en IObservable som kommer til å produsere ett element – nemlig returverdien til funksjonen (som dukker opp idet funksjonen blir ferdig) – og så terminere. ToAsync kan ta inntil 16 typeparametre: det siste er returtypen; alle de foregående er parametertypene. (Hvis man har behov for å lage en asynkron utgave av en funksjon som tar 16 parametre, er sannsynligvis tiden inne for en liten refaktorering…) Slik kan vi f.eks. lage en asynkron kvadratrotutregning:
var sqrt = Observable.ToAsync((number) => Math.Sqrt(number));
sqrt kan vi enten ta vare på for å eksplisitt be om resultatet når vi trenger det på et senere tidspunkt (ved å skrive double s = sqrt(42).First(); – dette vil blokkere inntil funksjonen er ferdig), eller vi kan registrere en lambdafunksjon for å håndtere resultatet når det dukker opp (og evt. en lambdafunksjon for å håndtere feil som måtte oppstå):
sqrt(42).Subscribe(answer => Console.WriteLine("The square root is {0}", answer));
I tillegg kan asynkrone funksjoner benyttes i LINQ-spørringer, som vi skal se i eksempelet nedenfor. For øvrig må man være bevisst på at når man lytter på en asynkron funksjon, vil notifikasjonen komme fra en annen tråd – så man må tenke på eventuelle race conditions som kan oppstå. Dessuten fungerer dette dårlig i en WinForms-applikasjon hvis man har tenkt å reagere på notifikasjonen ved å manipulere GUI’et, da WinForms har en dedikert GUI-tråd (hovedtråden til applikasjonen), som er den eneste tråden som har lov til å røre GUI’et. Hvis koden som abonnerer på den asynkrone funksjonen skal gjøre noe som helst med GUI’et, må man kalle s.SubscribeOnWindowsForms(this).Subscribe(...), hvor this refererer til den Form‘en hvis tråd man ønsker å benytte.
Eksempel: Autocompletion
Inspirert av [5] skal vi nå vise hvordan vi kan lage funksjonalitet som etterligner autocompletion-funksjonaliteten i f.eks. google sin søkeboks – etterhvert som man skriver, vil man få opp forslag til fullføringer av det man har skrevet inn. Vi tenker oss at det eksisterer en klasse AutocompletionWebService med en metode string[] GetSuggestions(string prefix) som bruker noen sekunder på å returnere. Hvis vi kaller denne metoden direkte under håndtering av en event som har sin opprinnelse i en GUI-event, vil GUI’et henge. Hovedvinduet til programmet inneholder en TextBox som heter SearchBox og en ListBox som heter ResultList. Kravspekken lyder som følger:
“Ett sekund etter siste tekstendring i tekstboksen skal søket startes i bakgrunnen. Når resultatet kommer tilbake skal listeboksen populeres med forslagene, men bare hvis teksten i tekstboksen ikke er blitt endret i mellomtiden.”
Dette kan vi løse slik i WinForms (all koden skal i Form-subklassen sin constructor eller OnLoad()):
- Den sentrale eventen her er
TextBox.TextChanged, så vi må pakke den inn i enIObservable. Forutsatt at en passende extension event er definert, kan vi gjøre det slik:var textChanged = this.SearchBox.GetTextChanged();
Hvis ikke, må vi skrive litt mer kode:
var textChanged = Observable.FromEvent<EventArgs>(this.SearchBox, "TextChanged");
- Web service-kallet må pakkes inn i en asynkron funksjon:
var webService = new AutocompletionWebService(); var asyncSearch = Observable.ToAsync(text => webService.GetSuggestions(text));
- Vi må definere at vi kun er interessert i tekstendringer etter ett sekund med inaktivitet, og at for hver slik tekstendring skal web servicen kalles. I tillegg er vi bare interessert i resultatet fra web servicen dersom ikke teksten er blitt endret i mellomtiden.
var searchResult = from tc in textChanged let text = this.SearchBox.Text from waitHere in Observable.Return(new Unit()) .Delay(1000) .Until(textChanged) from results in asyncSearch(text).Until(textChanged) select new { text, results };
- Til sist må vi abonnere på evenstrømmen vi har laget, og sørge for at vi mottar notifikasjonene på GUI-tråden:
searchResult.SubscribeOnWindowsForms(this).Subscribe(t => { this.ResultList.Items.Clear(); this.ResultList.Items.AddRange(t.results); });
Voilà! Det samme vil selvfølgelig være mulig å gjøre uten Rx, men det vil kreve en intrikat kombinasjon av timere og tilstandsvariabler inni event handlers – som det ikke vil være særlig trivielt å teste. Med Rx får vi uttrykt det samme mye mer konsist, og med mindre risiko for å gjøre noe feil. I denne sammenhengen kan det også nevnes at Rx allerede blir brukt internt i Microsoft sin kode – i enhetstestene til Silverlight.
Da er det bare å slippe kreativiteten løs; hvis noen har eksempler på spennende ting de har gjort i Rx, må man gjerne poste det i en kommentar!
Referanser
- Introduksjon til Rx og asynkron programmering
- Forklaring av dualiteten mellom Iterator pattern og Observer pattern
- “Extension events” og diskusjon om hvordan cross joins fungerer
- Drag-and-drop med Rx
- Inkrementelt søk med asynkron programmering i Rx
- Foredrag om
IObservable/IObserver - Foredrag om Rx
- Foredrag om Rx
- Bindable LINQ (et rammeverk basert på mye av den samme tankegangen som Rx)