Reading and Writing Data with S3 as source and sink
Apache Flink is a powerful stream processing framework that allows for real-time data processing.
Setting up Apache Flink with Java can be quite challenging and time-consuming. The process involves configuring numerous dependencies, handling complex coding tasks, and ensuring compatibility between various components. Moreover, you need to be adept at managing Java environments and troubleshooting issues that arise during setup and execution.
To alleviate these challenges, we offer a simplified approach utilizing Flink SQL. Flink SQL provides a more intuitive and streamlined way to interact with data, reducing the complexity inherent in programming with Java. By leveraging Flink SQL, users can efficiently run queries and manage data processing tasks with minimal setup and configuration.
This guide will walk you through setting up Apache Flink on an EC2 instance to read data from S3 as a source and save data to another location in S3 using the Flink S3 connector and simple SQL queries.
Introduction
Apache Flink is a powerful stream processing framework that allows for real-time data processing. This guide will walk you through setting up Apache Flink on an EC2 instance to read data from S3 as a source and use S3 as a sink.
Pre-requisites
• An AWS account
• Access to EC2 and S3 services
• An EC2 instance with sufficient resources
• Basic knowledge of Python and AWS
• Apache Flink installed on your local machine for testing
Installation
Launch EC2 Instance
• Log in to AWS Management Console
• Navigate to EC2 Dashboard and click "Launch Instance"
• Select an appropriate Amazon Machine Image (AMI) Linux and instance type(t3.2xlarge preferably as it has 8 cores and 32Gb of Memory)
• Configure instance details, add storage(EBS) and tag instance
• Configure security group to allow necessary ports(in security group add your local machine IP to be allowed for inbound rule with port 8081 – which will be used to view flink dashboard through local machine browser)
• Review and launch the instance
Install Apache Flink and Other dependencies
• SSH into your EC2 instance
• Download Apache Flink tarball(preferably the latest version) from the official website link(wget https://www.apache.org/dyn/closer.lua/flink/flink-1.19.2/flink-1.19.2-bin-scala_2.12.tgz)
• Extract the tarball(tar -xvzf flink-1.19.2-bin-scala_2.12.tgz) and move the files to a suitable directory(mv flink-1.19.2 /mnt/)
• Set up Java 11.x version or above
List java version : sudo yum list available | grep java
Install the right java from the list : sudo yum install java-11-openjdk-devel
Check if java is installed : java -version
openjdk version "11.0.27" 2025-04-15 LTS
OpenJDK Runtime Environment Corretto-11.0.27.6.1 (build 11.0.27+6-LTS)
OpenJDK 64-Bit Server VM Corretto-11.0.27.6.1 (build 11.0.27+6-LTS, mixed mode)
• Install python3 latest version
Install python: sudo yum install python3
Python 3 is installed but not accessible via the python command: sudo ln -s /usr/bin/python3 /usr/bin/python
Check path /usr/bin is part of the output: echo $PATH
Check python version: python –version
Python 3.9.22
• Install Pyflink library: pip3 install apache-flink
• Check if AWS services like S3 is accessible from EC2.If not then define your user’s ACCESS_KEY and SECRET_KEY in AWS credential file or use default credential.In current case we used default credentials of IAM role assigned to the EC2 instance.
• Download the flink-s3-fs-hadoop plugin(same version as of Apache flink) from the Flink distribution or the official Flink website
Goto plugin directory: cd /mnt/flink-1.19.2/plugins/
Download plugin: wget https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.19.2/flink-s3-fs-hadoop-1.19.2.jar
Make a directory to store the above file: mkdir flink-s3-fs-hadoop
Move the jar file in new directory: mv flink-s3-fs-hadoop-1.19.2.jar flink-s3-fs-hadoop/
• Modify the flink config file to allow us to view flink dashboard on start of cluster.
Open the config file: vi conf/config.yaml
Modify rest.bind-address: replace localhost with 0.0.0.0
• Modify flink config file to configure s3 settings at the last of the file:
s3.endpoint: s3.ap-southeast-3.amazonaws.com # Default endpoint for AWS S3
s3a.credentials.provider: org.apache.hadoop.fs.s3a.DefaultAWSCredentialsProviderChain
s3.connection.timeout: 5000 # in milliseconds
s3.request.timeout: 10000 # in milliseconds
• Start the Flink cluster( ./bin/start-cluster.sh)
• Once started you can verify if you are able to login to flink dashboard using URL : http://<>:8081/
• Below is how the dashboard looks like :
Check Environment Readiness
• Verify Java installation: java -version
• Ensure Flink cluster is up and running: ps – aux | grep flink
• Confirm connectivity to S3 buckets: aws s3 ls
Creating a Python Job
• Import pyflink library for environment creation:
• Create the execution environment:
• Configure the S3 access(preferably default credential of the IAM role) you can use other options :
• Create source table using flink SQL:
• Create Sink table using flink SQL :
• Transfer the data from source to sink using flink SQL:
• Test the script in EC2 using the command: ./bin/flink run -py s3_flink_job.py
• Once the job is submitted successfully view the logs on the flink dashboard in the browser for successful or failure run in “Completed Jobs” section.
Running the Job Using Managed Flink Cluster
• Package your Python script in jar format
• Upload the package to s3 location
• Create Apache flink application in AWS MAF(Managed Apache Flink)
• Once the application is created configure it with the JAR file path and bucket details
• Run the Job from console once all the configurations are completed and verified
• Monitor the job submission and execution
Monitoring the Job and Handling Errors
• Access Flink Dashboard to monitor job
• Check task manager logs to debug issues
• Implement retry and error handling mechanisms
Top comments (0)