RxJS ile Reaktif Programlamaya Giriş

Ünlü JavaScript Kütüphanesi RxJS'i Kullanarak Veri Akışlarını Yönetin

05.09.2018 Çarşamba
RxJS
12 dakika

Ön yüzde çalışan insanların zaman uyumsuz (asynchronous) ve koşutzamanlı (concurrent) işlerle başı hep beladadır. Sınırsız kullanıcı etkileşiminin, XHR (XMLHttpRequest) isteklerinin, web soketlerinin bulunduğu çok etkenli bir dünyadan bahsediyoruz. Yetmezmiş gibi, her an her şey ters gidebiliyor. Örneğin, tam canlı sohbet sırası size gelmişken bağlantı kopabiliyor. 😡 Ve ön yüz (front-end) yazılımcısının tüm bunları idare etmesi gerekiyor. Eh, bizim başımız belada olmasın da kimin olsun, değil mi?

İşte, böyle bir ortamda, veri ve olayları (event) sıraya sokmak adına, şimdiye dek pek çok yöntem ortaya kondu ve JavaScript giderek artan bu gereksinim yönünde evrimleşti:

  • Callback (artçı çağrı — bunu geri çağrım diye çevirmek resmen “chicken translate”)
  • Promise (söz, vaat, ??? 🤔)
  • Async Function, Iterator & Generator (zaman uyumsuz fonksiyon, yineleyici ve üretici — namıdiğer async/await)
  • Data/Event Stream (veri/olay akışı)
  • Callbag (bag bill = torba yasa => callbag = çağrı torbası)

Her birinin diğerlerine göre üstün ve eksik yönleri var. Karşılaştırmayı başka bir yazıya bırakıp, RxJS’le veri/olay akışlarını kullanarak neler yapabileceğimize bakalım.

📘 JavaScript’te tek iş parçacığı (thread) vardır ve koşutzamanlılık olay döngüsünün (event loop) üzerine kurgulanmıştır. Bu aslında konumuzla ilgili, ancak başka bir yazıda ayrıntılı olarak ele alamak gerek. Daha fazlası için şimdilik bkz. Concurrency Model and Event Loop

Akışı durdurmadan trafiği yönlendirmek sanattır...
Fotoğraf: Nick Fewings, Unsplash

RxJS Nedir?

RxJS, olay ve veri kaynaklarını abone olunabilir (subscribable) nesnelere dönüştürüp, bunlar üzerinde operatörler yardımıyla dönüşümler gerçekleştirebildiğiniz, gözlemleyenler (observer) aracılığıyla sonucu tüketebildiğiniz JavaScript’le yazılmış bir reaktif programlama (reactive programming) kütüphanesidir.

RxJS’i anlamayı kolaylaştıracak kadarıyla şu sıkıcı sınıf ve nesnelerini bir inceleyelim. 😒

Observable

  • RxJS’in temel sınıfı. Gözlemlenen.
  • Somutlaştırıldığında (instantiate) gözlemlenebilir bir nesne döner.
  • Herhangi bir veri veya olay Observable’a dönüştürülebilir.
  • RxJS’te bu amaçla geliştirilmiş birçok fabrika fonksiyon (factory function) vardır.
  • Bir Observable’ın neyi ne zaman yayınlayacağını belirlemekte operatörler kullanılır.
  • Yine operatörler aracılığıyla bir Observable’dan başka bir Observable türetilebilir.
  • Akışı tüketmek amacıyla her Observable’da bir subscribe metodu bulunur.

Observer

  • Observable’a abone olurken (subscribe) bağladığınız artçı çağrı (callback) seti. Gözlemleyen.
  • Observable’ın yayınladığı değer ve uyarıları tüketmeye yarar.
  • Üç çağrı tipi vardır:

    • Veriyi kullanmak için next
    • Hata işlemek için error
    • Observable’ın işinin bittiği durum için complete

Subscription

  • Bir Observable’a abone olduğunuzda dönen nesne. Abonelik.
  • Her Subscriptio’da onu sonlandıran bir unsubscribe metodu bulunur.
  • Bir Subscription sonlandı diye Observable yok olmaz; sadece kaynak tüketmeyi bırakır.
  • İşi biten (complete) Observable’a olan tüm Subscription’lar kendiliğinden sonlanır.

Subject

  • Hem Observable gibi abone olunabilen, hem de Observer’daki tüm metotları (next, error, complete) barındıran bir sınıf.
  • İstediğiniz anda bir değer yayınlamanıza olanak verir.
  • Asıl amacı bir Observable’ın birden fazla tüketiciye aynı yayını yapabilmesini sağlamaktır.
  • Hatalı kullanıma elverişlidir. Sık ve yersiz kullananlar mahallede sevilmez.

ReplaySubject

  • Özel bir Subject türüdür.
  • Somutlaştırılırken belirtilen arabellek boyutu (buffer size) kadar değeri tutabilir.
  • Örn. new ReplaySubject(3) en çok üç değer biriktirebilir.
  • Gecikmeli Subscription yaptığınızda o ana dek biriktirdiklerini bir anda alırsınız.

BehaviorSubject

  • Özel bir Subject türüdür.
  • Boyutu 1 olan bir ReplaySubject gibi davranır.
  • Subject ve ReplaySubject’in aksine, somutlaştırılırken bir başlangıç değeri alır.
  • Örn. new BehaviorSubject(‘test’) için başlangıç değeri ‘test’tir.

AsyncSubject

  • Özel bir Subject türüdür.
  • Boyutu 1 olan ve tuttuğu değeri sadece tamamlanma (complete) durumunda yayınlayan bir ReplaySubject gibi davranır.

Bu kadar teori yeter. Tümünün derin açıklamalarını yapmak isterdim; ama zamanımızı daha kullanışlı bilgilerle değerlendirmek niyetindeyim. Şimdi biraz uygulama yapalım.

Iterable - Observable İlişkisi

Konuya iterable (özyinelenen — ah be Türkçem 🇹🇷) kavramından başlamamın sebebi şu: Observable, kullandığınız metotlar ve davranış olarak dizi (array) gibi iterable tiplerle yadsınamaz bir benzerlik gösteriyor. Buyrun:

Array Observable
from from
of of
concat concat
map map
filter filter
reduce reduce
slice skip + take
[ 0 ] first
reverse + [ 0 ] last
flatMap 🆕 flatMap

Peki bu benzerlik uygulamada nasıl? Basit bir dizi örneğiyle başlayalım.

const {from} = Array;

const foo = from([
  undefined,
  'metin',
  999,
  false,
  () => {},
  new Date('August 30, 1922 19:05:19'),
  new Promise(resolve => resolve('RxJS')),
]);

const bar = foo
  .map(item => typeof item);
  .filter(type => type !== 'undefined');

foo.forEach(value => console.log(`foo: ${value}`));
bar.forEach(value => console.log(`bar: ${value}`));

/* LOG
foo: undefined
foo: metin
foo: 999
foo: false
foo: function () { }
foo: Wed Aug 30 1922 19:05:19 GMT+0300 (GMT+03:00)
foo: [object Promise]
bar: string
bar: number
bar: boolean
bar: function
bar: object
bar: object
*/

Herhalde, bu örnek yeterince açık. Önce from metodunu kullanarak foo adında bir dizi oluşturduk. Sonra bunu map yardımıyla dönüştürüp filter metoduyla istenmeyen öğeleri eleyerek bar adlı başka bir dizi daha oluşturduk. Son olarak iki dizinin de tüm öğelerini konsola yazdırdık. Şimdi de basit bir Observable örneği görelim:

import {from} from 'rxjs';
import {filter, map} from 'rxjs/operators';

const foo = from([
  undefined,
  'metin',
  999,
  false,
  () => {},
  new Date('August 30, 1922 19:05:19'),
  new Promise(resolve => resolve('RxJS')),
]);

const bar = foo.pipe(
  map(item => typeof item),
  filter(type => type !== 'undefined'),
);

foo.subscribe(value => console.log(`foo: ${value}`));
bar.subscribe(value => console.log(`bar: ${value}`));

/* LOG
foo: undefined
foo: metin
foo: 999
foo: false
foo: function () { }
foo: Wed Aug 30 1922 19:05:19 GMT+0300 (GMT+03:00)
foo: [object Promise]
bar: string
bar: number
bar: boolean
bar: function
bar: object
bar: object
*/

Gördüğünüz gibi, konsola yazdırılanların hiçbir farkı yok. İki kod arasındaysa sadece ufak farklar var. 🔬 Elbette, bu RxJS’in gücünü ortaya koyamayan, niteliksiz bir örnek. Yine de benzerlikleri ve temel kavramları gözlemlemek adına yararlı. O halde bulgularımızı listeleyelim:

  • Observable yaratan fabrika fonksiyonları 'rxjs' yolundan, operatörleriyse 'rxjs/operators' yolundan içe aldık (import).
  • from fabrika fonksiyonunu kullanarak foo isimli Observable nesneyi elde ettik.
  • Her Observable’da bulunan pipe metodunu operatörleri zincirlemekte kullandık.
  • map ve filter operatörlerini kullanarak foodan bar Observable nesnesini türettik.
  • Yine her Observable’da bulunan subscribe metodunu kullanarak ikisinin de yayınladığı tüm değerleri konsola yazdırdık.

“Bu mudur yani? Çok basitmiş.” dediğinizi duyar gibiyim. Evet, Observable nesnelerin genel kullanımı gerçekten de bu kadar. Ama biz bununla yetinmeyip, daha derine ineceğiz. 🐟 Nitekim, reaktif programlamanın nimetlerini görme zamanı geldi.

📘 TypeScript kullananlara kısa bir not: Tipleri ‘rxjs’ yolundan içe alıyorsunuz.

Gerçek Yaşamdan Örnekler

Bu kadar altyapı yeter. Şimdi biraz da RxJS’in gerçek yaşamda karşımıza çıkabilecek isterleri çözmede bize nasıl yardımcı olabileceğine bakalım.

Aşağıdaki örneklerin tümünü StackBlitz’e de yükledim. 👈 Ayrıca, kodun içine de açıklamalar ekledim ki adım adım takibi kolay olsun. Ha, bir de, $ işareti “stream” kelimesini simgeliyor, bilginize…

RxJS’le Çift Tıklamaları Diğerlerinden Ayırma

“Ne olacak yahu, dblclick olayını dinlerim.” demeyin: Burada marifet çift olmayan tıklamaları da algılayabilmek ve eğer çift tıklama söz konusuysa kesinlikle ona göre davranmak. Bunu elbette başka yöntemlerle de yapabilirsiniz; ama bu kadar okunaklı olacağını sanmam.

import {fromEvent} from 'rxjs';
import {buffer, debounceTime, partition, map} from 'rxjs/operators';

// İşaretli alanı seç.
const area = document.querySelector('#area');

// Bu alana yapılan tüm tıklamalardan bir akış oluştur.
const click$ = fromEvent(area, 'click');

// Tıklama akışına 0.3s ara verildiğinde yayın yapan bir başka akış türet.
const debouncedClick$ = click$.pipe(debounceTime(300));

// Yine tıklama akışından...
const clickCount$ = click$.pipe(

  // ara verilinceye kadar olanları biriktir...
  buffer(debouncedClick$),

  // ve bunların sayısını bul.
  map(clicks => clicks.length),
);

// Çift tıklamalar ve diğerleri diye iki akış elde etmek için...
const [doubleClick$, otherClick$] = clickCount$.pipe(

  // tıklamaları sayıp bölümle.
  partition(count => count === 2),
);

// Şimdi konsola yazdırmak için hem doubleClick$'e...
doubleClick$.subscribe(
  () => console.log(`Çift tıklama tespit edildi.`)
);

// hem de otherClick$ abone ol.
otherClick$.subscribe(
  () => console.log(`Tıklama oldu; ama çift değil.`)
);

Bu örnekte önemli bir fabrika fonksiyon ile üç de yeni operatör var:

  • fromEvent, herhangi bir olayı Observable’a çevirir.
  • debounceTime, bir Observable belirttiğiniz süre geçmeden tekrar yayın yaparsa, önceki yayını yok sayar.
  • buffer, içine verilen uyarıcı (notifier) yayın yapana dek bir Observable’ın yaydıklarını biriktirebilmenizi sağlar.
  • partition, bir Observable’ın yaydığı değerleri iki gruba ayırmaya yarar.

RxJS’le Uzun Tıklamaları Tespit Etme

İçerisinde barındırdığı ve birazdan açıklayacağım bir kavram dolayısıyla, aşağıdaki kod reaktif programlamaya yeni başlayanlar için belki biraz karışık görünebilir; ama bu örneği iyice anlamak, RxJS’i kullanabilmek açısından oldukça önemli. 🔑 Gerekirse özümseyene kadar üzerinden tekrar geçin. Özyineleyin yani…

import {fromEvent, merge, of} from 'rxjs';
import {delay, filter, mapTo, mergeMap, takeUntil} from 'rxjs/operators';

// İşaretli alanı seç.
const area = document.querySelector('#area');

// Bu alanda fare basışlarından bir akış oluştur ve...
const mouseDown$ = fromEvent(area, 'mousedown');

// bunları sadece sol tuş olanlar kalacak şekilde ele.
const leftButtonDown$ = mouseDown$.pipe(
  filter(event => event.button === 0),
);

// Fareye basmayı bırakma eyleminden de bir akış oluştur.
const mouseUp$ = fromEvent(document, 'mouseup');

// Fareyi hareket ettirmeden de son bir akış daha oluştur.
const mouseMove$ = fromEvent(document, 'mousemove');

// Sol fare akışından bir başka akış türet ama...
const longPress$ = leftButtonDown$.pipe(

  // her basışta yeniden yaratılsın ve
  mergeMap(event => of(event).pipe(

    // 1s bekledikten sonra yayın yapsın,
    delay(1000),
    
    // beklerken de fare kaldırılır/hareket ederse sayılmasın.
    takeUntil(
      merge(mouseUp$, mouseMove$)
    ),
  )),

  // Akışın her yayını şu metne dönüştürülsün.
  mapTo(`Uzun tıklama tespit edildi.`)
);

// Tespit edilen uzun tıklamaları konsola yazdır.
longPress$.subscribe(console.log);

Şunlar sıradan fabrika fonksiyonu:

  • of tek bir değeri Observable’a dönüştürebilen, hayli yararlı bir fabrika fonksiyon.
  • merge iki Observable’ı birleştirip herhangi biri yayın yaptığında yayın yapan yeni bir Observable üretiyor.

Bunlar da sıradan operatörler:

  • delay, bir Observable’ı yayın yapmadan önce bekletmeye yarıyor.
  • takeUntil içerisinde verilen uyarıcı yayın yaptığında bağlı bulunduğu Observable’ı sonlandırıyor.
  • mapTo yayını fonksiyondan geçirerek dönüştürmek yerine sabit bir değere çeviriyor.

Ama şu değil: 😨

  • mergeMap üst seviye (higher-order) yani iç içe (nested) Observable türeten durumlarda birleştirip düzleştirme görevi görüyor. Zaten, diğer adı da flatMap.

Yine dizilerden örnek vererek nasıl çalıştığını anlatmaya çalışayım:

/*
Array.prototype.flatMap Chrome 69'da bulunmaktadır.
Chrome 69 bugün çıktı. Lodash'te benzer bir fonksiyon var.
*/
const foo = [1];                    // [1]
const bar = [foo.map(x => x * 2)];  // [[2]]
const baz = bar.flatMap(x => x);    // [2]

Yukarıda Array.prototype.flatMapin diziler üzerinde yaptığı gibi, mergeMap de üst seviye, yani iç içe Observable’lar üzerinde basit bir düzleştirme işlemi yapıyor. Böylece pipe işlemine kaldığınız yerden devam edebiliyorsunuz. Bir de, üstteki örnekte görüldüğü gibi, kendi yaşam döngüsü olan yeni bir Observable yarattığı için, bu yeni observable sonlandırılsa dahi, kaynak observable sonlandırılmıyor. Bu, örnekteki kodun yeniden çalışabilmesi için önemli.

RxJS’le Tıklamaları HTTP İsteğine Dönüştürme

Angular’ın olay bağlamasına (event binding), React’in yapay olaylarına (synthetic event) o kadar alıştık ki, aşağıdaki örnek bazılarınıza işlevsiz görünecektir. Ben aynı fikirde değilim. DOM üzerinden doğrudan olay dinlemeyi bir yana koyarsak, bu örnekte anlatılan yaklaşım size modern web uygulamalarında da yararlı olacaktır.

import {from, fromEvent} from 'rxjs';
import {debounceTime, map, switchMap, tap} from 'rxjs/operators';

// İşaretli alanı seç ve...
const area = document.querySelector('#area');

// bu alana yapılan tüm tıklamalardan bir akış oluştur.
const click$ = fromEvent(area, 'click');

// Bir sayaç başlat.
let counter = 0;

// Tıklama akışını al...
const magic$ = click$.pipe(

  // ve bir saniye ara verilmedikçe hiçbirini geçirme.
  debounceTime(1000),

  // Ara verildiğinde son tıklamayı sayaca dönüştür.
  map(() => ++counter),

  // Sayacı URL'in sonuna ekleyip adres elde et.
  map(id => `https://api.magicthegathering.io/v1/cards/${id}`),

  // Bu adrese istek yapmaya başladığını konsola yazdır...
  tap(endpoint => console.log(`Loading: ${endpoint}`)),

  // ve isteği yap. Yalnız bu bir Promise dönecek.
  map(endpoint => fetch(endpoint)),

  // Onu isteğin yanıtını bekleyen bir Observable'a çevir.
  switchMap(request => from(request)),

  // Yanıt da Promise. Onu da Observable'a çevir ve çözümle.
  switchMap(response => from(response.json())),

  // Bize kart lazım. Al onu içinden...
  map(({card}) => card || {}),

  // ve konsola yazdır.
  tap(console.log),
);

// Şimdi bütün bunların kullanılabilmesi için magic$'e abone ol.
magic$.subscribe({
  next: ({name}) => area.textContent = name,
  error: err => area.textContent = err,
  complete: () => console.log('Bitti.'),
});

İki yeni operatör daha var burada:

  • tap, yan etki (side-effect) adı verilen işleri yapabilmenize olanak tanıyor. Burada sistem günlüğüne kayıt atıyor.
  • switchMap, işlevi açısından mergeMap‘e benziyor. Tek farkı, bunu bir önceki akışı iptal ederek yapıyor olması.

📘 Eğer ne yaptığınızı bildiğinizden emin değilseniz, switchMap‘i tercih edebilirsiniz; çünkü genellikle aradığınız aslına odur. İptal etme özelliğiyle, sisteminize ek yük binmesine engel olabilir.

RxJS ile Eş Zamanlı HTTP İstekleri Yapma

Hemen herkesin gereksinim duyduğu bir konuya geldik. Modern JavaScript uygulamaları, XHR’ı yoğun bir şekilde kullanır ve bazen bir sayfayı açabilmek veya bir istek daha yapabilmek adına birkaç isteğin birden cevabını beklemeniz gerekir. İşte böyle durumlarda aşağıdaki gibi bir çözüm uygulayabilirsiniz.

import {forkJoin, from, fromEvent, Subject} from 'rxjs';
import {throttle, map, mapTo, switchMap} from 'rxjs/operators';

// İşaretli alanı seç.
const area = document.querySelector('#area');

// Daha sonra kullanmak üzere bir Subject oluştur.
const observer$ = new Subject();

// İşaretli alana yapılan tıklamalardan bir akış oluşturup...
const throttledClick$ = fromEvent(area, 'click').pipe(

  // Observer'dan uyarı gelene dek artçı tıklamaları görmezden gel.
  throttle(() => observer$),
);

// Tıklama akışını al ve...
const swapi$ = throttledClick$.pipe(

  // adreslere dönüştür.
  mapTo(
    [1, 2].map(id => `https://swapi.co/api/people/${id}`)
  ),

  // İki adrese de aynı anda istek yapmak için...
  switchMap(urls => forkJoin(

    // adresleri forkJoin'e dağıt (spread) ve...
    ...urls.map(

      // fabrika fonksiyon kullanarak...
      (url, i) => from(

        // isteğe dönüştür.
        fetch(url).then(response => response.json())
      )
    )
  )),

  // Sonuçları birlikte tüket.
  map(([p1, p2]) => `${p1.name} ve ${p2.name}`),
);

// Şimdi önce Subject'e abone ol...
observer$.subscribe({
  next: console.log,
  error: console.error,
});

// sonra da Subject'i swapi$'e Observer olarak bağla.
swapi$.subscribe(observer$);

Operatörlerden throttle, çok önemli değil. Zira, debounce gibi çalışıyor; sadece tersi. Yani ilk yayını dikkate alıyor, sonuncuyu değil. Sondaki “Time”ın olmadığını fark etmişsinizdir; o şekilde çalışanı da var. Bu haliyle belirli bir süre değil, bir uyarıcıdan gelecek yayını bekliyor.

Öte yandan, çok önemli iki konuyu ele almış durumda bu örnek:

  • Subjectin hem Observer olarak bir Observable’a verildiğine, hem de başka bir Observable tarafından uyarıcı olarak tüketildiğine tanık olduk.
  • forkJoin ile iki Observable’ı zaman kaygısı olmaksızın birleştirip kullandık. İşte tam anlamıyla hayat kurtaran bir fonksiyon. Ayrıca, forkJoin ile birleştireceğiniz Observable sayısı ikiyle sınırlı olmak durumunda da değil.

👾 Gözden kaçmış olabilir: Yazıdaki tüm örneklere StackBlitz üzerinden ulaşabilirsiniz.

Kapanış

Biraz vakit ayırıp diğer operatörlere de yakından bakarsanız çok yararlı olacaktır. Yalnız, RxJS’in resmi sayfası halen yapılıyor (ve biraz dağınık gidiyor). Onun yerine Learn RxJSe başvurabilirsiniz. Umuyorum ki yakın zamanda RxJS’le harika işler yapmaya hemen başlayabilirsiniz. Güç sizde artık. 💪

RxJS’in kullanım alanı, bir makaleye sığmayacak kadar geniş ve karşınızdaki problemin zorluk derecesi arttıkça, uyguladığınız teknikler de karmaşık ve dolambaçlı bir hal alabiliyor. İleride, operatör ve fabrika fonksiyonları daha yakından inceleyen yeni içerikler eklemeyi planlıyorum. Ayrıca, Subscription’ın nasıl bir şey olduğunu ve neden yok edilmesi gerektiğini de anlatmaya çalışacağım. İrtibatı koparmayalım.

Bitti. 🕺