🗂️ SQL Connectors
Meerschaum's first-class connector is the SQLConnector
. Several built-in sql
connectors are defined by default:
Connector |
Description |
URI |
sql:main (default) |
The pre-configured TimescaleDB instance included in the Meerschaum Stack. It corresponds to a database running on localhost and is therefore shared amongst environments. |
postgresql+psycopg://mrsm:mrsm@localhost:5432/meerschaum |
sql:local |
A SQLite file within the Meerschaum root directory. Because sql:local is contained in the root directory, it is isolated between environments. |
sqlite:///$MRSM_ROOT_DIR/sqlite/mrsm_local.db |
sql:memory |
An in-memory SQLite database. This is not persistent and is isolated per-process. |
sqlite:///:memory: |
Add new connectors with bootstrap connectors
or by setting environment variables.
Supported Flavors
The following database flavors are confirmed to be feature-complete through the Meerschaum test suite and are listed in descending order of compatability and performance.
- TimescaleDB
- PostgreSQL
- Citus
- SQLite
- MariaDB
- MySQL 5.7+
- DuckDB
- Microsoft SQL Server
- Oracle SQL
In-place Syncs
When a pipe has the same fetch and instance connectors, syncing will occur entirely within the database context through SQL. As such, this is a high-performance method to incrementally materialize views.
Inplace syncing example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 | import meerschaum as mrsm
weather_pipe = mrsm.Pipe(
'plugin:noaa', 'weather', 'atl',
columns = {
'datetime': 'timestamp',
'id': 'station',
},
parameters = {
'noaa': {
'stations': ['KATL'],
},
},
)
inplace_pipe = mrsm.Pipe(
'sql:main', 'weather_avg', 'atl',
columns = {
'datetime': 'day',
'station': 'station',
},
parameters = {
'sql': f"""
SELECT
TIME_BUCKET('1 day', timestamp) AS "day",
station,
AVG("temperature (degC)") AS avg_temp
FROM "{weather_pipe.target}"
GROUP BY "day", station
""",
},
)
### Because the input and output connectors are both `sql:main`,
### syncing occurs entirely in SQL and nothing is loaded into RAM.
success, msg = inplace_pipe.sync()
df = inplace_pipe.get_data()
df
# day station avg_temp
# 0 2023-11-22 KATL 12.2
# 1 2023-11-24 KATL 15.916667
# 2 2023-11-25 KATL 10.579167
# 3 2023-11-26 KATL 9.01
# 4 2023-11-27 KATL 7.951852
# .. ... ... ...
# 58 2024-05-21 KATL 25.208696
# 59 2024-05-22 KATL 25.185714
# 60 2024-05-23 KATL 26.004545
# 61 2024-05-24 KATL 26.479167
# 62 2024-05-25 KATL 25.0
#
# [63 rows x 3 columns]
|
Utility Functions
If you work closely with relational databases, you may find the SQLConnector
very useful. See below for several handy functions that Meerschaum provides:
Pass the name of a table or a SQL query into SQLConnector.read()
to fetch a Pandas DataFrame.
| import meerschaum as mrsm
conn = mrsm.get_connector("sql:main")
df = conn.read('SELECT 1 AS foo')
df
# foo
# 0 1
|
read()
also supports server-side cursors, allowing you to efficiently stream chunks from the result set:
| ### Set `as_iterator=True` to return a dataframe generator.
###
table = 'sql_main_weather_avg_atl'
chunks = conn.read(table, chunksize=25, as_iterator=True)
for chunk in chunks:
print(f"Loaded {len(chunk)} rows.")
# Loaded 25 rows.
# Loaded 25 rows.
# Loaded 13 rows.
|
Wrapper around pandas.DataFrame.to_sql()
. Persist your dataframes directly to tables.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 | import meerschaum as mrsm
import pandas as pd
conn = mrsm.get_connector('sql:main')
df = pd.DataFrame([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}])
results = conn.to_sql(df, 'foo', as_dict=True)
mrsm.pprint(results)
# {
# 'target': 'foo',
# 'method': "functools.partial(<function psql_insert_copy at 0x7f821594d800>, schema='public')",
# 'chunksize': 100000,
# 'num_rows': 2,
# 'start': 203452.165351391,
# 'end': 203452.255752858,
# 'duration': 0.09040146699408069,
# 'success': True,
# 'msg': 'It took 0.09 seconds to sync 2 rows to foo.'
# }
|
Execute SQL queries directly and return the SQLAlchemy result. This is useful for queries without result sets, like DROP
, ALTER
, CREATE
, UPDATE
, INSERT
, etc., as well as executing stored procedures.
| import meerschaum as mrsm
conn = mrsm.get_connector('sql:main')
_ = conn.exec('DROP TABLE IF EXISTS foo')
_ = conn.exec("CREATE TABLE foo (bar INT)")
result = conn.exec("INSERT INTO foo (bar) VALUES (1), (2)")
print(f"Inserted {result.rowcount} rows.")
# Inserted 2 rows.
|
A safer option to execute multiple queries is passing a list to SQLConnector.exec_queries()
. The flag break_on_error
will roll back the transaction if any of the provided queries fail.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | import meerschaum as mrsm
conn = mrsm.get_connector('sql:main')
conn.exec('DROP TABLE IF EXISTS foo')
### Transaction should fail and roll back,
### meaning `foo` will not be created.
queries = [
'CREATE TABLE foo (bar INT)',
'CREATE TABLE foo (a INT, b INT)',
]
results = conn.exec_queries(queries, break_on_error=True, silent=True)
success = len(results) == len(queries)
assert not success
from meerschaum.utils.sql import table_exists
assert not table_exists('foo', conn)
|
🚧 This section is still under construction ― code snippets will be added soon!
Build a WHERE
clause based on the params
filter.
1
2
3
4
5
6
7
8
9
10
11
12
13 | import meerschaum as mrsm
from meerschaum.utils.sql import build_where
conn = mrsm.get_connector('sql:main')
print(build_where({'foo': [1, 2, 3]}, conn))
#
# WHERE
# "foo" IN ('1', '2', '3')
print(build_where({'foo': ['_3', '_4']}, conn))
#
# WHERE
# ("foo" NOT IN ('3', '4'))
|
Wrap a subquery in a CTE and append an encapsulating query.
| from meerschaum.utils.sql import wrap_query_with_cte
sub_query = "WITH foo AS (SELECT 1 AS val) SELECT (val * 2) AS newval FROM foo"
parent_query = "SELECT newval * 3 FROM src"
query = wrap_query_with_cte(sub_query, parent_query, 'mssql')
print(query)
# WITH foo AS (SELECT 1 AS val),
# [src] AS (
# SELECT (val * 2) AS newval FROM foo
# )
# SELECT newval * 3 FROM src
|
Check if a table exists.
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | import pandas as pd
import meerschaum as mrsm
df = pd.DataFrame([{'a': 1}])
conn = mrsm.get_connector('sql:main')
conn.to_sql(df, 'foo')
from meerschaum.utils.sql import table_exists
print(table_exists('foo', conn))
# True
conn.exec("DROP TABLE foo")
print(table_exists('foo', conn))
# False
|
Return a SQLAlchemy table object.
| import pandas as pd
import meerschaum as mrsm
df = pd.DataFrame([{'a': 1}])
conn = mrsm.get_connector('sql:main')
conn.to_sql(df, 'foo')
from meerschaum.utils.sql import get_sqlalchemy_table
table = get_sqlalchemy_table('foo', conn)
print(table.columns)
# {'a': Column('a', BIGINT(), table=<foo>)}
|
Return a dictionary mapping a table's columns to data types, even during a not-yet-committed session.
| import pandas as pd
import meerschaum as mrsm
df = pd.DataFrame([{'a': 1}])
conn = mrsm.get_connector('sql:main')
conn.to_sql(df, 'foo')
from meerschaum.utils.sql import get_table_cols_types
cols_types = get_table_cols_types('foo', conn)
print(cols_types)
# {'a': 'BIGINT'}
|