Skip to main content

Streaming

ZIO Elasticsearch offers a few different API methods for creating ZIO streams out of search requests. The library offers two different streaming modes relying on two different ways of retrieving paged results from Elasticsearch: scroll and search_after. When using the Elasticsearch.stream(...) method you can provide your own configuration by creating the StreamConfig object and providing it as a parameter for the method next to SearchRequest. If you choose not to provide StreamConfig then StreamConfig.Default will be used.

StreamConfig.Default uses Scroll API by default (which is recommended for queries that have under 10,000 results), has keep_alive parameter set for 1m and uses Elasticsearch default page size.

StreamConfig also makes use of our fluent API, so you can use methods withPageSize (used to determine how many documents to return per page) and keepAliveFor (used to tell Elasticsearch how long should search be kept alive after every pagination using Time units). StreamConfig has two predefined values for StreamConfig.Scroll that uses ElasticSearch Scroll API and StreamConfig.SearchAfter that uses Search After API with Point In Time.

StreamConfig(searchAfter = false, keepAlive = "5m", pageSize = Some(100))

When using the streamAs[A] method, results are parsed into the desired type A, relying on an implicit schema for A.

final case class User(id: Int, name: String)

object User {
implicit val schema: Schema.CaseClass2[Int, String, User] =
DeriveSchema.gen[User]

val (id, name) = schema.makeAccessors(FieldAccessorBuilder)
}

val request: SearchRequest =
ElasticRequest.search(IndexName("index"), ElasticQuery.range(User.id).gte(5))

val searchAfterStream: ZStream[Elasticsearch, Throwable, User] =
Elasticsearch.streamAs[User](request, StreamConfig.SearchAfter)

Besides the type-safe streamAs[A] method, the library offers a basic stream method, which result will be a stream of type Item which contains a raw field that represents a document using the Json type from the ZIO JSON library.

val request: SearchRequest =
ElasticRequest.search(IndexName("index"), ElasticQuery.range("id").gte(5))

val defaultStream: ZStream[Elasticsearch, Throwable, Item] =
Elasticsearch.stream(request)

val scrollStream: ZStream[Elasticsearch, Throwable, Item] =
Elasticsearch.stream(request, StreamConfig.Scroll)