Introducing FlinkSQL in Cloudera Streaming Analytics

Our 1.2.0.0 release of Cloudera Streaming Analytics Powered by Apache Flink brings a wide range of new functionality, including support for lineage and metadata tracking via Apache Atlas, support for connecting to Apache Kudu and the first iteration of the much-awaited FlinkSQL API.

Flink’s SQL interface democratizes stream processing, as it caters to a much larger community than the currently widely used Java and Scala APIs focusing on the Data Engineering crowd. Generalizing SQL to stream processing and streaming analytics use cases poses a set of challenges: we have to tackle expressing infinite streams and timeliness of records. Let us take the following query into account:

SELECT
  userId,
  COUNT(*) AS count,
  SESSION_START(clicktime, INTERVAL '30' MINUTE)
FROM clicks
GROUP BY
  SESSION(clicktime, INTERVAL '30' MINUTE)

  userId

This query produces the count of clicks per user session defined by 30-minute inactivity between sessions, and is updated live as new sessions are encountered. This is an example of a concept that is already well established in stream processing, in this case session windowing being introduced to the SQL syntax to express the timeliness of records. It is important to highlight that syntax supported by Flink is ANSI SQL, it is not a specific dialect. In fact, the Flink community is collaborating with Apache Beam and Apache Calcite communities to tackle challenges of FlinkSQL in a unified manner.

Transforming streaming organizations

Looking at the above query it is evident that a significantly larger user base can effectively formulate queries and thus add value to the business than previously. However, it poses the following questions to the organization:

  1. How much of the business logic can be formulated in SQL in the streaming domain?
  2. How does this transform the journey of streaming jobs from development to production?
  3. How does this affect the scope of the Data Engineering team?

We believe that the majority of the streaming queries written today can be expressed via FlinkSQL, to provide an educated guess we expect it to be in the order of 80% of the streaming queries that we encounter today lend themselves well to be implemented via this SQL API. This might seem like an overstatement at first, we are diving into the details in the next section.

Currently, we often encounter organizations using Flink where the deriving business value in near-real-time is the privilege of the data engineers. The data analyst crowd, whose members are often the experts of domain-specific knowledge, tends to resort to snapshots of these streams stored in standard MPP or OLAP systems, like querying data stored in Kudu via Apache Impala. This inherently introduces a gap between finding an insight and productionalizing it in a streaming fashion. An analyst after proving their hypothesis has to secure funding for a project with a couple of data engineers for weeks or even months to meticulously reimplement their business logic that has already been formulated in a different language, typically SQL. FlinkSQL empowers the analysts to interact with the streams directly and to deploy streaming jobs with a click of a button.

This in turn liberates the data engineers to focus on the challenging 20% of the queries and building the reusable domain-specific libraries that can be leveraged directly from SQL as a suite of user-defined functions. 

FlinkSQL Functionality

To showcase the functionality of FlinkSQL we have recently released an SQL tutorial under our standard suite of tutorials. Let us highlight some of the features here.

The tutorial operates on an Apache Kafka topic containing transactions of items in JSON format. Let us define a table schema for this and specify that we would like to measure the passing of time as recorded by the timestamp column (referred to as event-time semantics).

CREATE TABLE ItemTransactions (
transactionId    BIGINT,
`timestamp`    BIGINT,
itemId    STRING,
quantity INT,
event_time AS CAST(from_unixtime(floor(`timestamp`/1000)) AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector.type'      = 'kafka',
'connector.version'   = 'universal',
'connector.topic'     = 'transaction.log.1',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = '<broker_address>',
'format.type' = 'json'
);

Note that when working with event time semantics we have to specify a watermark to provide a heuristic for Flink to measure the passing of event time. This can be an arbitrary expression returning a timestamp. On a high level the watermark specifies the tradeoff between correctness (waiting indefinitely for potential late arrivals) and latency (producing results as quickly as possible).

Once we have created the above table we can submit the following queries:

SELECT * FROM ItemTransactions LIMIT 10;
SELECT TUMBLE_START(event_time, INTERVAL '10' SECOND) as window_start, itemId, sum(quantity) as volume
FROM ItemTransactions
GROUP BY itemId, TUMBLE(event_time, INTERVAL '10' SECOND);

The first query provides straightforward sampling. Using the limit clause is optional, omitting that leads to the results constantly being updated in a streaming fashion. The second query implements a simple windowed aggregation. The results of these queries can be returned to the interactive Flink SQL cli, or can be written directly to output tables via INSERT INTO statements.

FlinkSQL also offers more complex clauses, for example finding the top 3 items with the most number of transactions in every 10-minute window can be formulated as follows:

SELECT * FROM (
  SELECT * ,
  ROW_NUMBER() OVER (
    PARTITION BY window_start
    ORDER BY num_transactions desc
  ) AS rownum
  FROM (
    SELECT TUMBLE_START(event_time, INTERVAL '10' MINUTE) AS window_start, itemId, COUNT(*) AS num_transactions
    FROM ItemTransactions
    GROUP BY itemId, TUMBLE(event_time, INTERVAL '10' MINUTE)
  )
)
WHERE rownum <=3;

Apart from these built-in language elements, you can register functions implemented in Java and Scala to your FlinkSQL environment. 

FlinkSQL also supports accessing external catalogs to tap into the schema and data stored in external systems, currently, we support the Hive, Kudu, and Schema registry catalogs.

Next steps

In our current release the two options for submitting SQL queries are to us the SQL CLI or to wrap them into Java programs. We are actively working on a graphical user interface to aid interactive query editing as discussed in our recent keynote at Flink Forward San Francisco.

SQL Review

Following the GUI addition, we will shortly expose its programmatic backend for third-party tools, to expose an interface that can be the equivalent of JDBC for FlinkSQL, more than likely built on REST and Kafka.

Learn more

If you are interested in learning more about FlinkSQL and more innovations in Cloudera Streaming Analytics, attend the final episode of our popular Powerchat webinar series – Flink Power Chat 4: A Best Practices Checklist for Developing in Apache Flink.

The post Introducing FlinkSQL in Cloudera Streaming Analytics appeared first on Cloudera Blog.

Source: Cloudera