DEV Community

Cover image for SQLAlchemy Core in Python: Getting Started
bhuma08
bhuma08

Posted on

SQLAlchemy Core in Python: Getting Started

Introduction

SQLAlchemy is the Python SQL toolkit and Object Relational Mapper that gives application developers the full power and flexibility of SQL.

SQLAlchemy is broken down into 2 thing: An object related mapper(ORM) and core. Core focuses on Schema and raw data API whereas ORM is more of APIs that allow you to create a much more robust, domain-centric structure to your code.

Getting Started:

Installation and Setup of the Development Environment :
Download Anaconda

You can then launch Jupyter notebook using Anaconda Navigator.
To open and modify the database by, you can download DB browser for SQLite.

Connecting to the database using Jupyter:

from sqlalchemy import create_engine
engine = create_engine(...)
connection = engine.connect()
Enter fullscreen mode Exit fullscreen mode

To see if the connection is successful:

query = 'SELECT * FROM users'
results = connect.execute.(query)
results.fetchall()
Enter fullscreen mode Exit fullscreen mode

Examples of Querying and Aggregating Data:

from sqlalchemy import *
metadata = MetaData()
heroes_table = Table('heroes', metadata,
                    Column('hero_id', Integer, nullable=False, primary_key=True),
                    Column('name', String(20), nullable=False, unique=True),
)

from sqlalchemy.sql import select
query = select([heroes_table])
results = connection.execute(query)
results.fetchall()
Enter fullscreen mode Exit fullscreen mode
query = select([heroes_table]).limit(5)
query = select([heroes_table]).offset(5)
query = select([heroes_table]).order_by(heroes_table.columns.name)
query = select([heroes_table]).order_by(heroes_table.columns.name.desc())
query = select([heroes_table]).offset(5).limit(5)
Enter fullscreen mode Exit fullscreen mode

Using WHERE:

query = select([heroes_table]).where(heroes_table.columns.hero_id > 5)
query = select([heroes_table]).where((heroes_table.columns.hero_id > 5) & (heroes_table.columns.hero_id < 15))
Enter fullscreen mode Exit fullscreen mode

Using AND

query = select([heroes_table]).where(and_(heroes_table.columns.hero_id > 5, heroes_table.columns.hero_id < 15))
query = select([heroes_table]).where((heroes_table.columns.hero_id < 5) | (heroes_table.columns.hero_id > 15))
Enter fullscreen mode Exit fullscreen mode

Using OR

query = select([heroes_table]).where(or_(heroes_table.columns.hero_id < 5, heroes_table.columns.hero_id > 15))
query = select([heroes_table]).where(heroes_table.columns.name == 'Jade')
Enter fullscreen mode Exit fullscreen mode
query = select([heroes_table.columns.hero_id]).where(heroes_table.columns.name == 'Jade')
query = select([heroes_table.columns.name]).where(heroes_table.columns.name.like('A%'))
query = select([func.sum(battle_events_table.columns.rubies_gained)])
results = connection.execute(query)
results.scalar()
query = select([func.max(battle_events_table.columns.rubies_gained)])
query = select([func.min(battle_events_table.columns.rubies_gained)])
Enter fullscreen mode Exit fullscreen mode

Examples of Joining multiple tables:

metadata.remove(heroes_table)

battle_event_types_table = Table('battle_event_types', metadata,
    Column('battle_event_type_id', Integer, nullable=False),
    Column('name', nullable=False),
)


battle_events_table = Table('battle_events', metadata,
    Column('battle_event_id', Integer, nullable=False, primary_key=True),
    Column('battle_participant_id', Integer, ForeignKey('battle_participants.battle_participant_id'), nullable=False),
    Column('battle_event_type_id', Integer, ForeignKey('battle_events.battle_event_id'), nullable=False),
    Column('rubies_gained', Integer, nullable=False),
    Column('timestamp', Integer, nullable=False)
)

battle_participants_table = Table('battle_participants', metadata,
    Column('battle_participant_id', Integer, nullable=False, primary_key=True),
    Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
    Column('battle_id', Integer, ForeignKey('battles.battle_id'), nullable=False),
    Column('hero_id', Integer, ForeignKey('heroes.hero_id'), nullable=False)
)

heroes_table = Table('heroes', metadata,
                    Column('hero_id', Integer, nullable=False, primary_key=True),
                    Column('name', String(20), nullable=False, unique=True),
)

join = battle_participants_table\
    .join(heroes_table, \
         battle_participants_table.columns.hero_id == heroes_table.columns.hero_id)\
    .join(battle_events_table, \
         battle_participants_table.columns.battle_participant_id == battle_events_table.columns.battle_participant_id)\
    .join(battle_event_types_table, \
         battle_events_table.columns.battle_event_type_id == battle_event_types_table.columns.battle_event_type_id)

query = select([
    heroes_table.columns.name,
    func.count(battle_events_table.columns.battle_event_id).label('total_kills')
])\
    .select_from(join)\
    .where(battle_event_types_table.columns.name == 'HERO_KILL')\
    .group_by(heroes_table.columns.name)\
    .order_by(Column('total_kills').desc())

results = connection.execute(query)
results.fetchall()
Enter fullscreen mode Exit fullscreen mode

Top comments (0)