Skip to content

🗂️ SQL Connectors

Meerschaum's first-class connector is the SQLConnector. Several built-in sql connectors are defined by default:

Connector Description URI
sql:main (default) The pre-configured TimescaleDB instance included in the Meerschaum Stack. It corresponds to a database running on localhost and is therefore shared amongst environments. postgresql+psycopg://mrsm:mrsm@localhost:5432/meerschaum
sql:local A SQLite file within the Meerschaum root directory. Because sql:local is contained in the root directory, it is isolated between environments. sqlite:///$MRSM_ROOT_DIR/sqlite/mrsm_local.db
sql:memory An in-memory SQLite database. This is not persistent and is isolated per-process. sqlite:///:memory:

Add new connectors with bootstrap connectors or by setting environment variables.

Supported Flavors

The following database flavors are confirmed to be feature-complete through the Meerschaum test suite and are listed in descending order of compatability and performance.

  • TimescaleDB
  • PostgreSQL
  • Citus
  • SQLite
  • MariaDB
  • MySQL 5.7+
  • DuckDB
  • Microsoft SQL Server
  • Oracle SQL

In-place Syncs

When a pipe has the same fetch and instance connectors, syncing will occur entirely within the database context through SQL. As such, this is a high-performance method to incrementally materialize views.

Inplace syncing example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import meerschaum as mrsm

weather_pipe = mrsm.Pipe(
    'plugin:noaa', 'weather', 'atl',
    columns = {
        'datetime': 'timestamp',
        'id': 'station',
    },
    parameters = {
        'noaa': {
            'stations': ['KATL'],
        },
    },
)
inplace_pipe = mrsm.Pipe(
    'sql:main', 'weather_avg', 'atl',
    columns = {
        'datetime': 'day',
        'station': 'station',
    },
    parameters = {
        'sql': f"""
            SELECT
                TIME_BUCKET('1 day', timestamp) AS "day",
                station,
                AVG("temperature (degC)") AS avg_temp
            FROM "{weather_pipe.target}"
            GROUP BY "day", station
        """,
    },
)

### Because the input and output connectors are both `sql:main`,
### syncing occurs entirely in SQL and nothing is loaded into RAM.
success, msg = inplace_pipe.sync()

df = inplace_pipe.get_data()
df
#           day station   avg_temp
# 0  2023-11-22    KATL       12.2
# 1  2023-11-24    KATL  15.916667
# 2  2023-11-25    KATL  10.579167
# 3  2023-11-26    KATL       9.01
# 4  2023-11-27    KATL   7.951852
# ..        ...     ...        ...
# 58 2024-05-21    KATL  25.208696
# 59 2024-05-22    KATL  25.185714
# 60 2024-05-23    KATL  26.004545
# 61 2024-05-24    KATL  26.479167
# 62 2024-05-25    KATL       25.0
# 
# [63 rows x 3 columns]

Utility Functions

If you work closely with relational databases, you may find the SQLConnector very useful. See below for several handy functions that Meerschaum provides:

SQLConnector Methods

read()

Pass the name of a table or a SQL query into SQLConnector.read() to fetch a Pandas DataFrame.

1
2
3
4
5
6
import meerschaum as mrsm
conn = mrsm.get_connector("sql:main")
df = conn.read('SELECT 1 AS foo')
df
#    foo
# 0    1

read() also supports server-side cursors, allowing you to efficiently stream chunks from the result set:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
### Set `as_iterator=True` to return a dataframe generator.
### 
table = 'sql_main_weather_avg_atl'
chunks = conn.read(table, chunksize=25, as_iterator=True)
for chunk in chunks:
    print(f"Loaded {len(chunk)} rows.")

# Loaded 25 rows.
# Loaded 25 rows.
# Loaded 13 rows.

to_sql()

Wrapper around pandas.DataFrame.to_sql(). Persist your dataframes directly to tables.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import meerschaum as mrsm
import pandas as pd

conn = mrsm.get_connector('sql:main')
df = pd.DataFrame([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}])
results = conn.to_sql(df, 'foo', as_dict=True)

mrsm.pprint(results)
# {
#     'target': 'foo',
#     'method': "functools.partial(<function psql_insert_copy at 0x7f821594d800>, schema='public')",
#     'chunksize': 100000,
#     'num_rows': 2,
#     'start': 203452.165351391,
#     'end': 203452.255752858,
#     'duration': 0.09040146699408069,
#     'success': True,
#     'msg': 'It took 0.09 seconds to sync 2 rows to foo.'
# }

exec()

Execute SQL queries directly and return the SQLAlchemy result. This is useful for queries without result sets, like DROP, ALTER, CREATE, UPDATE, INSERT, etc., as well as executing stored procedures.

1
2
3
4
5
6
7
8
import meerschaum as mrsm
conn = mrsm.get_connector('sql:main')
_ = conn.exec('DROP TABLE IF EXISTS foo')
_ = conn.exec("CREATE TABLE foo (bar INT)")

result = conn.exec("INSERT INTO foo (bar) VALUES (1), (2)")
print(f"Inserted {result.rowcount} rows.")
# Inserted 2 rows.

exec_queries()

A safer option to execute multiple queries is passing a list to SQLConnector.exec_queries(). The flag break_on_error will roll back the transaction if any of the provided queries fail.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import meerschaum as mrsm
conn = mrsm.get_connector('sql:main')
conn.exec('DROP TABLE IF EXISTS foo')

### Transaction should fail and roll back,
### meaning `foo` will not be created.
queries = [
    'CREATE TABLE foo (bar INT)',
    'CREATE TABLE foo (a INT, b INT)',
]
results = conn.exec_queries(queries, break_on_error=True, silent=True)
success = len(results) == len(queries)
assert not success

from meerschaum.utils.sql import table_exists
assert not table_exists('foo', conn)

meerschaum.utils.sql Functions

🚧 This section is still under construction ― code snippets will be added soon!

build_where()

Build a WHERE clause based on the params filter.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import meerschaum as mrsm
from meerschaum.utils.sql import build_where

conn = mrsm.get_connector('sql:main')
print(build_where({'foo': [1, 2, 3]}, conn))
# 
# WHERE
#     "foo" IN ('1', '2', '3')

print(build_where({'foo': ['_3', '_4']}, conn))
# 
# WHERE
#     ("foo" NOT IN ('3', '4'))

wrap_query_with_cte()

Wrap a subquery in a CTE and append an encapsulating query.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from meerschaum.utils.sql import wrap_query_with_cte
sub_query = "WITH foo AS (SELECT 1 AS val) SELECT (val * 2) AS newval FROM foo"
parent_query = "SELECT newval * 3 FROM src"
query = wrap_query_with_cte(sub_query, parent_query, 'mssql')
print(query)
# WITH foo AS (SELECT 1 AS val),
# [src] AS (
#     SELECT (val * 2) AS newval FROM foo
# )
# SELECT newval * 3 FROM src

table_exists()

Check if a table exists.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import pandas as pd
import meerschaum as mrsm

df = pd.DataFrame([{'a': 1}])
conn = mrsm.get_connector('sql:main')
conn.to_sql(df, 'foo')

from meerschaum.utils.sql import table_exists
print(table_exists('foo', conn))
# True

conn.exec("DROP TABLE foo")
print(table_exists('foo', conn))
# False

get_sqlalchemy_table()

Return a SQLAlchemy table object.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import pandas as pd
import meerschaum as mrsm

df = pd.DataFrame([{'a': 1}])
conn = mrsm.get_connector('sql:main')
conn.to_sql(df, 'foo')

from meerschaum.utils.sql import get_sqlalchemy_table
table = get_sqlalchemy_table('foo', conn)
print(table.columns)
# {'a': Column('a', BIGINT(), table=<foo>)}

get_table_cols_types()

Return a dictionary mapping a table's columns to data types, even during a not-yet-committed session.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import pandas as pd
import meerschaum as mrsm

df = pd.DataFrame([{'a': 1}])
conn = mrsm.get_connector('sql:main')
conn.to_sql(df, 'foo')

from meerschaum.utils.sql import get_table_cols_types
cols_types = get_table_cols_types('foo', conn)
print(cols_types)
# {'a': 'BIGINT'}