Home

Meerschaum Banner

Out-of-the-box ETL, easy to learn, and a pleasure to use!

What is Meerschaum?

Meerschaum is a platform for quickly creating and managing time-series data streams called pipes. With Meerschaum, you can have a data visualization stack running in minutes.

Why Meerschaum?

Two words: Incremental updates. Fetch the data you need, and Meerschaum will handle the rest.

If you've worked with time-series data, you know the headaches that come with ETL. Meerschaum is a system that makes consolidating and syncing data easy.

Meerschaum instead gives you better tools to define and sync your data streams. And don't worry — you can always incorporate Meerschaum into your existing systems.


✨ Features

Organize ETL Processes into Pipes

Meerschaum Pipes are parametrized ETL processes that are tagged and organized into hierarchies to make scaling up a breeze.


Robust Plugin System

Plugins make it easy to ingest any data source, add functionality to Meerschaum, and organize your utility scripts.

Clean Connector Management

Define your connectors at any level: through the CLI, in your environment, or dynamically.

1
2
3
4
5
6
7
8
9
### You can follow an interactive wizard.
mrsm bootstrap connector

### Or define connectors in your environment.
export MRSM_SQL_BAZ='postgresql://foo:bar@localhost:5432/baz'
export MRSM_MONGODB_LOCAL='{
  "uri": "mongodb://localhost:27017",
  "database": "meerschaum"
}'
1
2
3
### Or you can build connectors on-the-fly in code.
import meerschaum as mrsm
conn = mrsm.get_connector('sql:demo', flavor='sqlite', database='/tmp/demo.db')

Design, Develop, and Deploy with Meerschaum Compose

The compose workflow allows you to iterate with and version-control your pipes, making collaboration and maintainability much smoother.

Performant SQL Transformations

SQL pipes with the same instance and source connector are synced in-place ― deltas are resolved entirely through SQL and nothing is loaded into RAM.

Concurrency at its Finest

Maximize your throughput by syncing multiple pipes in parallel.

Elegant Chunking

Calm your out-of-memory fears with automatic, parallelized chunking.

Simple-Yet-Powerful API

Want to use Meerschaum in your code? Check out the package documentation!
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import meerschaum as mrsm
conn = mrsm.get_connector("sql:demo", uri="sqlite:////tmp/demo.db")

pipe = mrsm.Pipe(
    'foo', 'bar',
    instance = conn,
    columns = {'datetime': 'dt', 'id': 'id'},
    dtypes = {'attrs': 'json'},
)
docs = [
    {'dt': '2023-01-01', 'id': 1, 'val': 123.4},
    {'dt': '2023-01-01', 'id': 2, 'val': 567.8},
]
pipe.sync(docs)

docs = [
    {'dt': '2023-01-01', 'id': 1, 'attrs': {'foo': 'bar'}},
]
pipe.sync(docs)

df = pipe.get_data(params={'id': [1, 2]})
print(df)
#           dt  id    val           attrs
# 0 2023-01-01   1  123.4  {'foo': 'bar'}
# 1 2023-01-01   2  567.8            None

Extensible Connectors Interface

Add custom connector types with the @make_connector decorator.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from meerschaum.connectors import make_connector, Connector
required = ['requests']

@make_connector
class NWSConnector(Connector):

    REQUIRED_ATTRIBUTES = ['username', 'password']

    def fetch(self, pipe, begin=None, end=None, **kw):
        params = {}
        begin = begin or pipe.get_sync_time()
        if begin:
            params['start'] = begin.isoformat()
        if end:
            params['end'] = end.isoformat()

        stations = pipe.parameters.get('nws', {}).get('stations', [])
        for station in stations:
            url = f"https://api.weather.gov/stations/{station}/observations"
            response = self.session.get(url, params=params)
            yield [
                feature['properties']
                for feature in response.json()['features']
            ]

    @property
    def session(self):
        _sesh = self.__dict__.get('_session', None)
        if _sesh is not None:
            return _sesh
        import requests
        self._session = requests.Session()
        self._session.auth = (self.username, self.password)
        return self._session

Video Tutorials

Support the Project

I work on Meerschaum in my free time, so if you enjoy the project and want to support its development, feel free to buy me a coffee! You can also support the project on my GitHub Sponsors page.

Consulting Services

If you're looking to recruit my skills, you can hire my consulting services. Reach out on LinkedIn to get in touch, or you can commission my help at my GitHub Sponsors page.