🗄️ Instance Connectors¶
Instance connectors store pipes' metadata (and may also be used as a source connector, e.g. the SQLConnector).
To use your custom connector type as an instance connector, inherit from InstanceConnector and implement the following methods (replacing the pseudocode under the TODO comments with your connector's equivalent). See the MongoDBConnector for a specific reference.
Implementing get_pipe_data or get_pipe_docs
For reading data from a pipe's target table, you may implement either get_pipe_data() or get_pipe_docs():
- Implement
get_pipe_data()to return apd.DataFrame(recommended for SQL-based connectors). - Implement
get_pipe_docs()to return alist[dict](recommended for document stores and connectors that produce JSON natively). This avoids DataFrame construction overhead when callers usePipe.get_docs()orPipe.get_data(as_docs=True).
The default get_pipe_data() calls get_pipe_docs() and wraps the result in a DataFrame. The default get_pipe_docs() calls get_pipe_data() and converts to records. You only need to implement one.
Inherit from InstanceConnector
Your connector class inherits from InstanceConnector, a subclass of Connector which implements the pipes' methods as an interface.
Using the params Filter
Methods which take the params argument (get_pipe_data(), get_sync_time()) behave similarly to the filters applied to fetch_pipes_keys.
The easiest way to support params is with meerschaum.utils.dataframe.query_df():
from meerschaum.utils.dataframe import query_df, parse_df_datetimes
df = parse_df_datetimes([
{'ts': '2024-01-01 00:00:00', 'color': 'red'},
{'ts': '2024-02-02 02:00:00', 'color': 'blue'},
{'ts': '2024-03-03 03:00:00', 'color': 'green'},
])
print(query_df(df, {'color': 'red'}))
# ts color
# 0 2024-01-01 red
print(query_df(df, begin='2024-02-01', datetime_column='ts'))
# ts color
# 1 2024-02-02 02:00:00 blue
# 2 2024-03-03 03:00:00 green
For advanced implementations, see the definition for MongoDBConnector.build_query() for an example of how to adapt the params filter to your connector's query specification.
register_pipe()¶
Store a pipe's attributes in a pipes table.
The attributes row of a pipe includes the pipe's keys (immutable) and parameters dictionary (mutable):
connector_keys(str)metric_key(str)location_key(str | None)
You may store"None"in place ofNone.parameters(dict[str, Any])
You can access the in-memory parameters withpipe._attributes.get('parameters', {}).
def register_pipe():
def register_pipe(
self,
pipe: mrsm.Pipe,
debug: bool = False,
**kwargs: Any
) -> mrsm.SuccessTuple:
"""
Insert the pipe's attributes into the internal `pipes` table.
Parameters
----------
pipe: mrsm.Pipe
The pipe to be registered.
Returns
-------
A `SuccessTuple` of the result.
"""
attributes = {
'connector_keys': str(pipe.connector_keys),
'metric_key': str(pipe.metric_key),
'location_key': str(pipe.location_key),
'parameters': pipe._attributes.get('parameters', {}),
}
### TODO insert `attributes` as a row in the pipes table.
# self.pipes_collection.insert_one(attributes)
return True, "Success"
get_pipe_attributes()¶
Return the attributes dictionary for a pipe (see register_pipe() above).
Note that a pipe's attributes must be JSON-serializable, so objects like MongoDB's ObjectId must be converted to strings.
def get_pipe_attributes():
def get_pipe_attributes(
self,
pipe: mrsm.Pipe,
debug: bool = False,
**kwargs: Any
) -> dict[str, Any]:
"""
Return the pipe's document from the internal `pipes` collection.
Parameters
----------
pipe: mrsm.Pipe
The pipe whose attributes should be retrieved.
Returns
-------
The document that matches the keys of the pipe.
"""
query = {
'connector_keys': str(pipe.connector_keys),
'metric_key': str(pipe.metric_key),
'location_key': str(pipe.location_key),
}
### TODO query the `pipes` table either using these keys or `get_pipe_id()`.
result = {}
# result = self.pipes_collection.find_one(query) or {}
return result
get_pipe_id()¶
Return the ID tied to the pipe's connector, metric, and location keys.
def get_pipe_id():
def get_pipe_id(
self,
pipe: mrsm.Pipe,
debug: bool = False,
**kwargs: Any
) -> str | int | None:
"""
Return the ID for the pipe if it exists.
Parameters
----------
pipe: mrsm.Pipe
The pipe whose ID to return.
Returns
-------
The ID for the pipe or `None`.
"""
query = {
'connector_keys': str(pipe.connector_keys),
'metric_key': str(pipe.metric_key),
'location_key': str(pipe.location_key),
}
### TODO fetch the ID mapped to this pipe.
return None
edit_pipe()¶
Update the parameters dictionary of a pipe's registration.
def edit_pipe():
def edit_pipe(
self,
pipe: mrsm.Pipe,
debug: bool = False,
**kwargs: Any
) -> mrsm.SuccessTuple:
"""
Edit the attributes of the pipe.
Parameters
----------
pipe: mrsm.Pipe
The pipe whose in-memory parameters must be persisted.
Returns
-------
A `SuccessTuple` indicating success.
"""
query = {
'connector_keys': str(pipe.connector_keys),
'metric_key': str(pipe.metric_key),
'location_key': str(pipe.location_key),
}
pipe_parameters = pipe._attributes.get('parameters', {})
### TODO Update the row with new parameters.
# self.pipes_collection.update_one(query, {'$set': {'parameters': pipe_parameters}})
return True, "Success"
delete_pipe()¶
Delete a pipe's registration from the pipes table.
def delete_pipe():
def delete_pipe(
self,
pipe: mrsm.Pipe,
debug: bool = False,
**kwargs: Any
) -> mrsm.SuccessTuple:
"""
Delete a pipe's registration from the `pipes` collection.
Parameters
----------
pipe: mrsm.Pipe
The pipe to be deleted.
Returns
-------
A `SuccessTuple` indicating success.
"""
pipe_id = self.get_pipe_id(pipe, debug=debug)
if pipe_id is None:
return False, f"{pipe} is not registered."
### TODO Delete the pipe's row from the pipes table.
# self.pipes_collection.delete_one({'_id': pipe_id})
return True, "Success"
fetch_pipes_keys()¶
Return registered pipes' keys according to the provided filters.
Each filter should only be applied if the given list is not empty.
Values within filters are joined by OR, and filters are joined by AND.
The function separate_negation_values() returns two sublists: regular values (IN) and values preceded by an underscore (NOT IN).
The recommended return value is a dictionary mapping each pipe's ID to a tuple of (connector_keys, metric_key, location_key, parameters).
Returning the parameters lets Meerschaum hydrate every pipe in a single round-trip instead of calling get_pipe_attributes() per pipe.
It also lets Meerschaum apply the --targets and --datetime-dtypes filters client-side (both are derived from parameters), so you only need to implement the connector_keys, metric_keys, location_keys, and tags filters here.
Legacy return values
For backwards compatibility, you may also return a plain list of tuples, and tuples may be length 3 (omitting parameters). These forms are supported but lose the single-round-trip benefit above — prefer the dictionary form for new connectors.
def fetch_pipes_keys():
def fetch_pipes_keys(
self,
connector_keys: list[str] | None = None,
metric_keys: list[str] | None = None,
location_keys: list[str] | None = None,
tags: list[str] | None = None,
debug: bool = False,
**kwargs: Any
) -> dict[str | int, tuple[str, str, str, dict]]:
"""
Return registered pipes' keys according to the provided filters.
Parameters
----------
connector_keys: list[str] | None, default None
The keys passed via `-c`.
metric_keys: list[str] | None, default None
The keys passed via `-m`.
location_keys: list[str] | None, default None
The keys passed via `-l`.
tags: List[str] | None, default None
Tags passed via `--tags` which are stored under `parameters:tags`.
Returns
-------
A dictionary mapping each pipe's ID to a tuple of
`(connector_keys, metric_key, location_key, parameters)`.
You may return the string `"None"` for location keys in place of nulls.
Including `parameters` in each tuple lets Meerschaum apply the `--targets`
and `--datetime-dtypes` filters client-side, so you do not need to handle
those filters here.
Examples
--------
>>> import meerschaum as mrsm
>>> conn = mrsm.get_connector('example:demo')
>>>
>>> pipe_a = mrsm.Pipe('a', 'demo', tags=['foo'], instance=conn)
>>> pipe_b = mrsm.Pipe('b', 'demo', tags=['bar'], instance=conn)
>>> pipe_a.register()
>>> pipe_b.register()
>>>
>>> conn.fetch_pipes_keys(['a', 'b'])
{1: ('a', 'demo', 'None', {'tags': ['foo']}), 2: ('b', 'demo', 'None', {'tags': ['bar']})}
>>> conn.fetch_pipes_keys(metric_keys=['demo'])
{1: ('a', 'demo', 'None', {'tags': ['foo']}), 2: ('b', 'demo', 'None', {'tags': ['bar']})}
>>> conn.fetch_pipes_keys(tags=['foo'])
{1: ('a', 'demo', 'None', {'tags': ['foo']})}
>>> conn.fetch_pipes_keys(location_keys=[None])
{1: ('a', 'demo', 'None', {'tags': ['foo']}), 2: ('b', 'demo', 'None', {'tags': ['bar']})}
"""
from meerschaum.utils.misc import separate_negation_values
in_ck, nin_ck = separate_negation_values([str(val) for val in (connector_keys or [])])
in_mk, nin_mk = separate_negation_values([str(val) for val in (metric_keys or [])])
in_lk, nin_lk = separate_negation_values([str(val) for val in (location_keys or [])])
in_tags, nin_tags = separate_negation_values([str(val) for val in (tags or [])])
### TODO build a query like so, only including clauses if the given list is not empty.
### The `tags` clause is an OR ("?|"), meaning any of the tags may match.
###
###
### SELECT pipe_id, connector_keys, metric_key, location_key, parameters
### FROM pipes
### WHERE connector_keys IN ({in_ck})
### AND connector_keys NOT IN ({nin_ck})
### AND metric_key IN ({in_mk})
### AND metric_key NOT IN ({nin_mk})
### AND location_key IN ({in_lk})
### AND location_key NOT IN ({nin_lk})
### AND (parameters->'tags')::JSONB ?| ARRAY[{tags}]
### AND NOT (parameters->'tags')::JSONB ?| ARRAY[{nin_tags}]
### Return a dict mapping each pipe's ID to its keys and parameters, e.g.:
### {1: ('a', 'demo', 'None', {'tags': ['foo']})}
return {}
pipe_exists()¶
Return True if the target table exists and has data.
def pipe_exists():
def pipe_exists(
self,
pipe: mrsm.Pipe,
debug: bool = False,
**kwargs: Any
) -> bool:
"""
Check whether a pipe's target table exists.
Parameters
----------
pipe: mrsm.Pipe
The pipe to check whether its table exists.
Returns
-------
A `bool` indicating the table exists.
"""
table_name = pipe.target
### TODO write a query to determine the existence of `table_name`.
table_exists = False
return table_exists
drop_pipe()¶
Drop the pipe's target table.
def drop_pipe():
def drop_pipe(
self,
pipe: mrsm.Pipe,
debug: bool = False,
**kwargs: Any
) -> mrsm.SuccessTuple:
"""
Drop a pipe's collection if it exists.
Parameters
----------
pipe: mrsm.Pipe
The pipe to be dropped.
Returns
-------
A `SuccessTuple` indicating success.
"""
### TODO write a query to drop `table_name`.
table_name = pipe.target
return True, "Success"
drop_pipe_indices() (optional)¶
If syncing to your instance connector involves indexing a pipe's target table, you may find it useful to implement the method drop_pipe_indices() (for the action drop indices). See the SQLConnector.drop_pipe_indices() method for reference.
sync_pipe()¶
Upsert new data into the pipe's table.
You may use the built-in method pipe.filter_existing() to extract inserts and updates in case the database for this connector does not have upsert functionality.
The values of the pipe.columns dictionary are immutable indices to be used for upserts. You may improve performance by indexing these columns after an initial sync (i.e. pipe.exists() is False).
def sync_pipe():
def sync_pipe(
self,
pipe: mrsm.Pipe,
df: 'pd.DataFrame',
debug: bool = False,
**kwargs: Any
) -> mrsm.SuccessTuple:
"""
Upsert new documents into the pipe's target table.
Parameters
----------
pipe: mrsm.Pipe
The pipe whose collection should receive the new documents.
df: pd.DataFrame
The data to be synced.
Returns
-------
A `SuccessTuple` indicating success.
"""
### TODO Write the upsert logic for the target table.
### `pipe.filter_existing()` is provided for your convenience to
### remove duplicates and separate inserts from updates.
unseen_df, update_df, delta_df = pipe.filter_existing(df, debug=debug)
return True, "Success"
sync_pipe_inplace() (optional)¶
For situations where the source and instance connectors are the same, the method sync_pipe_inplace() allows you to bypass loading DataFrames into RAM and instead handle the syncs remotely. See the SQLConnector.sync_pipe_inplace() method for reference.
create_pipe_indices() (optional)¶
If syncing to your instance connector involves indexing a pipe's target table, you may find it useful to implement the method create_pipe_indices(). See the method SQLConnector.create_pipe_indices() for reference.
clear_pipe()¶
Delete a pipe's data within a bounded or unbounded interval without dropping the table:
def clear_pipe():
def clear_pipe(
self,
pipe: mrsm.Pipe,
begin: datetime | int | None = None,
end: datetime | int | None = None,
params: dict[str, Any] | None = None,
debug: bool = False,
) -> mrsm.SuccessTuple:
"""
Delete rows within `begin`, `end`, and `params`.
Parameters
----------
pipe: mrsm.Pipe
The pipe whose rows to clear.
begin: datetime | int | None, default None
If provided, remove rows >= `begin`.
end: datetime | int | None, default None
If provided, remove rows < `end`.
params: dict[str, Any] | None, default None
If provided, only remove rows which match the `params` filter.
Returns
-------
A `SuccessTuple` indicating success.
"""
### TODO Write a query to remove rows which match `begin`, `end`, and `params`.
return True, "Success"
deduplicate_pipe() (optional)¶
Like sync_pipe_inplace(), you may choose to implement deduplicate_pipe() for a performance boost. Otherwise, the default implementation relies upon get_pipe_data(), clear_pipe(), and get_pipe_rowcount(). See the SQLConnector.deduplicate_pipe() method for reference.
get_pipe_data()¶
Return the target table's data according to the filters.
The begin and end arguments correspond to the designated datetime axis (pipe.columns['datetime']).
The params argument behaves the same as fetch_pipes_keys() filters but may allow single values as well. See the disclaimer at the top of this page on building queries with params.
The convenience function parse_df_datetimes() casts dataframe-like lists of dictionaries (or dictionaries of lists) into DataFrames, automatically casting ISO strings to datetimes.
def get_pipe_data():
def get_pipe_data(
self,
pipe: mrsm.Pipe,
select_columns: list[str] | None = None,
omit_columns: list[str] | None = None,
begin: datetime | int | None = None,
end: datetime | int | None = None,
params: dict[str, Any] | None = None,
debug: bool = False,
**kwargs: Any
) -> Union['pd.DataFrame', None]:
"""
Query a pipe's target table and return the DataFrame.
Parameters
----------
pipe: mrsm.Pipe
The pipe with the target table from which to read.
select_columns: list[str] | None, default None
If provided, only select these given columns.
Otherwise select all available columns (i.e. `SELECT *`).
omit_columns: list[str] | None, default None
If provided, remove these columns from the selection.
begin: datetime | int | None, default None
The earliest `datetime` value to search from (inclusive).
end: datetime | int | None, default None
The lastest `datetime` value to search from (exclusive).
params: dict[str | str] | None, default None
Additional filters to apply to the query.
Returns
-------
The target table's data as a DataFrame.
"""
table_name = pipe.target
dt_col = pipe.columns.get("datetime", None)
### TODO Write a query to fetch from `table_name`
### and apply the filters `begin`, `end`, and `params`.
###
### To improve performance, add logic to only read from
### `select_columns` and not `omit_columns` (if provided).
###
### SELECT {', '.join(cols_to_select)}
### FROM "{table_name}"
### WHERE "{dt_col}" >= '{begin}'
### AND "{dt_col}" < '{end}'
### The function `parse_df_datetimes()` is a convenience function
### to cast a list of dictionaries into a DataFrame and convert datetime columns.
from meerschaum.utils.dataframe import parse_df_datetimes
rows = []
return parse_df_datetimes(rows)
get_pipe_docs() (alternative to get_pipe_data())¶
Return the target table's data as a list[dict]. Implement this instead of get_pipe_data() when your connector produces JSON natively (e.g. document stores, REST APIs), avoiding DataFrame construction overhead.
Callers that use Pipe.get_docs() or Pipe.get_data(as_docs=True) will call get_pipe_docs() directly, bypassing dtype enforcement and Pandas overhead.
def get_pipe_docs():
def get_pipe_docs(
self,
pipe: mrsm.Pipe,
select_columns: list[str] | None = None,
omit_columns: list[str] | None = None,
begin: datetime | int | None = None,
end: datetime | int | None = None,
params: dict[str, Any] | None = None,
order: str = 'asc',
limit: int | None = None,
debug: bool = False,
**kwargs: Any
) -> list[dict[str, Any]]:
"""
Return a pipe's data as a list of documents.
Parameters
----------
pipe: mrsm.Pipe
The pipe with the target table from which to read.
select_columns: list[str] | None, default None
If provided, only select these given columns.
omit_columns: list[str] | None, default None
If provided, remove these columns from the selection.
begin: datetime | int | None, default None
The earliest `datetime` value to search from (inclusive).
end: datetime | int | None, default None
The latest `datetime` value to search from (exclusive).
params: dict[str, Any] | None, default None
Additional filters to apply to the query.
order: str, default 'asc'
Sort order for the `datetime` axis (`'asc'` or `'desc'`).
limit: int | None, default None
If provided, cap the number of rows returned.
Returns
-------
The target table's data as a list of dictionaries.
"""
table_name = pipe.target
dt_col = pipe.columns.get("datetime", None)
### TODO Write a query to fetch from `table_name` and return raw dicts.
### Apply begin, end, params, order, and limit as in get_pipe_data().
rows = []
return rows
get_sync_time()¶
Return the largest (or smallest) value in target table, according to the params filter.
def get_sync_time():
def get_sync_time(
self,
pipe: mrsm.Pipe,
params: dict[str, Any] | None = None,
newest: bool = True,
debug: bool = False,
**kwargs: Any
) -> datetime | int | None:
"""
Return the most recent value for the `datetime` axis.
Parameters
----------
pipe: mrsm.Pipe
The pipe whose collection contains documents.
params: dict[str, Any] | None, default None
Filter certain parameters when determining the sync time.
newest: bool, default True
If `True`, return the maximum value for the column.
Returns
-------
The largest `datetime` or `int` value of the `datetime` axis.
"""
### TODO write a query to get the largest value for `dt_col`.
### If `newest` is `False`, return the smallest value.
### Apply the `params` filter in case of multiplexing.
return None
get_pipe_columns_types()¶
Return columns and Pandas data types (you may also return PosgreSQL-style types). You may take advantage of automatic dtype enforcement by implementing this method.
Example
def get_pipe_columns_types(
self,
pipe: mrsm.Pipe,
debug: bool = False,
**kwargs: Any
) -> dict[str, str]:
"""
Return the data types for the columns in the target table for data type enforcement.
Parameters
----------
pipe: mrsm.Pipe
The pipe whose target table contains columns and data types.
Returns
-------
A dictionary mapping columns to data types.
"""
table_name = pipe.target
### TODO write a query to fetch the columns contained in `table_name`.
columns_types = {}
### Return a dictionary mapping the columns
### to their Pandas dtypes, e.g.:
### `{'foo': 'int64'`}`
### or SQL-style dtypes, e.g.:
### `{'foo': 'INT'}`
return columns_types
get_pipe_columns_indices() (optional)¶
You may choose to implement get_pipe_columns_indices(), which returns a dictionary mapping columns to a list of related indices. Additionally, implement the method SQLConnector.get_pipe_index_names() to return new indices to be created.
Example
def get_pipe_columns_indices(
self,
debug: bool = False,
) -> dict[str, list[dict[str, str]]]:
"""
Return a dictionary mapping columns to metadata about related indices.
Parameters
----------
pipe: mrsm.Pipe
The pipe whose target table has related indices.
Returns
-------
A list of dictionaries with the keys "type" and "name".
Examples
--------
>>> pipe = mrsm.Pipe('demo', 'shirts', columns={'primary': 'id'}, indices={'size_color': ['color', 'size']})
>>> pipe.sync([{'color': 'red', 'size': 'M'}])
>>> pipe.get_columns_indices()
{'id': [{'name': 'demo_shirts_pkey', 'type': 'PRIMARY KEY'}], 'color': [{'name': 'IX_demo_shirts_color_size', 'type': 'INDEX'}], 'size': [{'name': 'IX_demo_shirts_color_size', 'type': 'INDEX'}]}
"""
get_pipe_rowcount()¶
Return the number of rows in the pipe's target table within the begin, end, and params bounds:
def get_pipe_rowcount():
def get_pipe_rowcount(
self,
pipe: mrsm.Pipe,
begin: datetime | int | None = None,
end: datetime | int | None = None,
params: dict[str, Any] | None = None,
remote: bool = False,
debug: bool = False,
**kwargs: Any
) -> int:
"""
Return the rowcount for the pipe's table.
Parameters
----------
pipe: mrsm.Pipe
The pipe whose table should be counted.
begin: datetime | int | None, default None
If provided, only count rows >= `begin`.
end: datetime | int | None, default None
If provided, only count rows < `end`.
params: dict[str, Any] | None
If provided, only count rows othat match the `params` filter.
remote: bool, default False
If `True`, return the rowcount for the pipe's fetch definition.
In this case, `self` refers to `Pipe.connector`, not `Pipe.instance_connector`.
Returns
-------
The rowcount for this pipe's table according the given parameters.
"""
### TODO write a query to count how many rows exist in `table_name` according to the filters.
table_name = pipe.target
count = 0
return count
get_pipe_size() (optional)¶
Return the on-disk size (in bytes) of a pipe's target table, or None if the size cannot be determined. This powers Pipe.get_size() and the Size column of show targets. The default implementation raises NotImplementedError. See the SQLConnector.get_pipe_size() method for reference.
def get_pipe_size():
def get_pipe_size(
self,
pipe: mrsm.Pipe,
debug: bool = False,
**kwargs: Any
) -> int | None:
"""
Return the on-disk size of a pipe's target table in bytes.
Parameters
----------
pipe: mrsm.Pipe
The pipe whose target table size to measure.
Returns
-------
An `int` of the number of bytes occupied by the target table,
or `None` if the size cannot be determined.
"""
table_name = pipe.target
### TODO write a query to measure the size of `table_name` in bytes.
return None
compress_pipe() (optional)¶
Compress a pipe's target table to reduce disk usage (for the action compress pipes and Pipe.compress()). The default implementation returns a failure SuccessTuple indicating compression is unsupported. See the SQLConnector.compress_pipe() method for reference.
def compress_pipe():
def compress_pipe(
self,
pipe: mrsm.Pipe,
debug: bool = False,
**kwargs: Any
) -> mrsm.SuccessTuple:
"""
Compress a pipe's target table to reduce disk usage.
Parameters
----------
pipe: mrsm.Pipe
The pipe whose target table to compress.
Returns
-------
A `SuccessTuple` indicating success.
"""
### TODO write the logic to compress `pipe.target` (if supported).
return False, f"Compression is not supported for instance connectors of type '{self.type}'."
decompress_pipe() (optional)¶
The inverse of compress_pipe() (for the action decompress pipes and Pipe.decompress()). Pass no_policy=True to decompress existing data now while leaving the compression policy in place (e.g. for a bulk backfill, after which data is recompressed on schedule). The default implementation returns a failure SuccessTuple indicating decompression is unsupported. See the SQLConnector.decompress_pipe() method for reference.
def decompress_pipe():
def decompress_pipe(
self,
pipe: mrsm.Pipe,
no_policy: bool = False,
debug: bool = False,
**kwargs: Any
) -> mrsm.SuccessTuple:
"""
Decompress a pipe's target table, the inverse of `compress_pipe()`.
Parameters
----------
pipe: mrsm.Pipe
The pipe whose target table to decompress.
no_policy: bool, default False
If `True`, decompress existing data now but leave the compression policy in
place so future data is recompressed on schedule.
Returns
-------
A `SuccessTuple` indicating success.
"""
### TODO write the logic to decompress `pipe.target` (if supported).
return False, f"Decompression is not supported for instance connectors of type '{self.type}'."
vacuum_pipe() (optional)¶
Reclaim dead-tuple disk space from a pipe's target table (for the action vacuum pipes and Pipe.vacuum()). Pass full=True to perform a heavier rewrite that returns freed space to the OS (where supported). The default implementation returns a failure SuccessTuple indicating vacuuming is unsupported. See the SQLConnector.vacuum_pipe() method for reference.
def vacuum_pipe():
def vacuum_pipe(
self,
pipe: mrsm.Pipe,
full: bool = False,
debug: bool = False,
**kwargs: Any
) -> mrsm.SuccessTuple:
"""
Reclaim dead-tuple disk space from a pipe's target table.
Parameters
----------
pipe: mrsm.Pipe
The pipe whose target table to vacuum.
full: bool, default False
If `True`, perform a heavier rewrite that returns freed space to the
operating system at the cost of an exclusive lock.
Returns
-------
A `SuccessTuple` indicating success.
"""
### TODO write the logic to vacuum `pipe.target` (if supported).
return False, f"Vacuuming is not supported for instance connectors of type '{self.type}'."
analyze_pipe() (optional)¶
Refresh the database planner's statistics for a pipe's target table (for the action analyze pipes and Pipe.analyze()). Unlike vacuum_pipe(), this does not reclaim disk space — it helps the query planner choose better plans after large syncs. The default implementation returns a failure SuccessTuple indicating analysis is unsupported. See the SQLConnector.analyze_pipe() method for reference.
def analyze_pipe():
def analyze_pipe(
self,
pipe: mrsm.Pipe,
debug: bool = False,
**kwargs: Any
) -> mrsm.SuccessTuple:
"""
Refresh the database planner's statistics for a pipe's target table.
Parameters
----------
pipe: mrsm.Pipe
The pipe whose target table to analyze.
Returns
-------
A `SuccessTuple` indicating success.
"""
### TODO write the logic to analyze `pipe.target` (if supported).
return False, f"Analysis is not supported for instance connectors of type '{self.type}'."