Main Content

readevents

Read raw events from Kafka stream without schema processing applied

Since R2022b

    This function requires Streaming Data Framework for MATLAB® Production Server™.

    Description

    example

    event = readevents(ks) returns a structure array that contains raw events from the Kafka® stream ks. Each event in the stream creates an event structure in the resulting structure array. No schema processing is applied to the event.

    Examples

    collapse all

    Assume that you have a Kafka server running at the network address kafka.host.com:9092 that has a topic RecamanSequence.

    Create a KafkaStream object for reading from and writing to the RecamanSequence topic.

    ks = kafkaStream("kafka.host.com",9092,"RecamanSequence")
    ks = 
    
      KafkaStream with properties:
    
                      Topic: "RecamanSequence"
                      Group: "d89f5726-6abf-461d-a14e-4d40ab84c676"
                      Order: EventTime
                       Host: "kafka.host.com"
                       Port: 9092
          ConnectionTimeout: 30
             RequestTimeout: 61
              ImportOptions: "None"
              ExportOptions: "Source: function eventSchema"
              PublishSchema: "true"
                 WindowSize: 50
                KeyVariable: "key"
                KeyEncoding: "utf16"
                    KeyType: "text"
               KeyByteOrder: "BigEndian"
               BodyEncoding: "utf8"
                 BodyFormat: "JSON"
                  ReadLimit: "Size"
        TimestampResolution: "Milliseconds"

    Read 50 events, which is the default number of events, from the RecamanSequence topic.

    events = readevents(ks)
    events = 
    
      50×1 struct array with fields:
    
        key
        value
        timestamp

    readevents blocks other operations until it reads 50 messages or times out after 61 seconds of receiving no messages. To strictly limit blocking time to 61 seconds even if more are messages available, specify ReadLimit=Time in the call to kafkaStream. To change the timeout duration, for example, to 15 seconds, specify RequestTimeout=15 in the call to the KafkaStream object, ks.

    Input Arguments

    collapse all

    Object connected to a Kafka stream topic, specified as a KafkaStream object.

    Output Arguments

    collapse all

    Event information, returned as a structure array. Each structure in the array has these fields.

    Event key as stored in Kafka, returned as a string array or integer. The key identifies the event source.

    Data Types: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64 | char | string

    Event value, specified as a byte array with a format and encoding determined by the BodyFormat and BodyEncoding properties of the stream object. The event value does not undergo schema processing and appears exactly as is stored in Kafka, for example, as a JSON string.

    Data Types: string | uint8 | uint16

    Timestamp of event occurrence or timestamp of event ingestion in Kafka, specified as a datetime scalar.

    Data Types: datetime

    Version History

    Introduced in R2022b