Skip to content

🗄️ Instance Connectors

Instance connectors store pipes' registrations and data in addition to the usual fetch() functionality of regular connectors, e.g. the SQLConnector.

To use your custom connector type as an instance connector, implement the following methods, replacing the pseudocode under the TODO comments with your connector's equivalent. See the MongoDBConnector for a specific reference.

The SuccessTuple type annotation is an alias for Tuple[bool, str] and may be imported:

1
from meerschaum.utils.typing import SuccessTuple
Using the params Filter

Methods which take the params argument (get_pipe_data(), get_sync_time(), get_backtrack_data()) behave similarly to the filters applied to fetch_pipes_keys.

See the definition for MongoDBConnector.build_query() for an example of how to adapt the params filter to your connector's query specification.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
>>> build_query({'a': 1})
{'a': {'$eq': 1}}
>>> 
>>> build_query({'a': '_b'})
{'a': {'$ne': 'b'}}
>>> 
>>> build_query({'a': ['c', '_d']})
{'a': {'$eq': 'c', {'$neq': 'd'}}}
>>> 
>>> build_query({'a': [1, 2, 3]})
{'a': {'$nin': [1, 2, 3]}}
>>> 
>>> build_query({'a': []})
{}
get_backtrack_data() Deprecation Notice

As of v1.7.0+, get_backtrack_data() was replaced with a generic alternative. Your connector may still override this method:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def get_backtrack_data(
        self,
        pipe: mrsm.Pipe,
        backtrack_minutes: int = 0,
        begin: Union[datetime, int, None] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kwargs: Any
    ) -> 'pd.DataFrame':
    """
    Return the most recent interval of data leading up to `begin` (defaults to the sync time).

    Parameters
    ----------
    pipe: mrsm.Pipe,
        The number of minutes leading up to `begin` from which to search.
        If `begin` is an integer, then subtract this value from `begin`.

    backtrack_minutes: int, default 0
        The number of minutes leading up to `begin` from which to search.
        If `begin` is an integer, then subtract this value from `begin`.

    begin: Union[datetime, int, None], default None
        The point from which to begin backtracking.
        If `None`, then use the pipe's sync time (most recent datetime value).

    params: Optional[Dict[str, Any]], default None
        Additional filter parameters.

    Returns
    -------
    A Pandas DataFrame for the interval of size `backtrack_minutes` leading up to `begin`.
    """
    from datetime import datetime, timedelta

    if begin is None:
        begin = pipe.get_sync_time(params=params, debug=debug)

    backtrack_interval = (
        timedelta(minutes=backtrack_minutes)
        if isinstance(begin, datetime)
        else backtrack_minutes
    )

    if begin is not None:
        begin = begin - backtrack_interval

    return self.get_pipe_data(
        pipe,
        begin = begin,
        params = params,
        debug = debug,
        **kwargs
    )

register_pipe()

Store a pipe's attributes in a pipes table.

The attributes row of a pipe includes the pipe's keys (immutable) and parameters dictionary (mutable):

  • connector_keys (str)
  • metric_key (str)
  • location_key (Union[str, None])
    You may store "None" in place of None.
  • parameters (Dict[str, Any])
    You can access the in-memory parameters with pipe._attributes.get('parameters', {}).
def register_pipe():
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def register_pipe(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
        **kwargs: Any
    ) -> SuccessTuple:
    """
    Insert the pipe's attributes into the internal `pipes` table.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to be registered.

    Returns
    -------
    A `SuccessTuple` of the result.
    """
    attributes = {
        'connector_keys': str(pipe.connector_keys),
        'metric_key': str(pipe.connector_key),
        'location_key': str(pipe.location_key),
        'parameters': pipe._attributes.get('parameters', {}),
    }

    ### TODO insert `attributes` as a row in the pipes table.
    # self.pipes_collection.insert_one(attributes)

    return True, "Success"

get_pipe_attributes()

Return the attributes dictionary for a pipe (see register_pipe() above). Note that a pipe's attributes must be JSON-serializable, so objects like MongoDB's ObjectId must be converted to strings.

def get_pipe_attributes():
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def get_pipe_attributes(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
        **kwargs: Any
    ) -> Dict[str, Any]:
    """
    Return the pipe's document from the internal `pipes` collection.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe whose attributes should be retrieved.

    Returns
    -------
    The document that matches the keys of the pipe.
    """
    query = {
        'connector_keys': str(pipe.connector_keys,
        'metric_key': str(pipe.metric_key),
        'location_key': str(pipe.location_key),
    }
    ### TODO query the `pipes` table either using these keys or `get_pipe_id()`.
    result = {}
    # result = self.pipes_collection.find_one(query) or {}
    return result

get_pipe_id()

Return the ID tied to the pipe's connector, metric, and location keys.

def get_pipe_id():
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def get_pipe_id(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
        **kwargs: Any
    ) -> Union[str, int, None]:
    """
    Return the `_id` for the pipe if it exists.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe whose `_id` to fetch.

    Returns
    -------
    The `_id` for the pipe's document or `None`.
    """
    query = {
        'connector_keys': str(pipe.connector_keys,
        'metric_key': str(pipe.metric_key),
        'location_key': str(pipe.location_key),
    }
    ### TODO fetch the ID mapped to this pipe.
    # oid = (self.pipes_collection.find_one(query, {'_id': 1}) or {}).get('_id', None)
    # return str(oid) if oid is not None else None

edit_pipe()

Update the parameters dictionary of a pipe's registration.

def edit_pipe():
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def edit_pipe(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
        **kwargs: Any
    ) -> SuccessTuple:
    """
    Edit the attributes of the pipe.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe whose in-memory parameters must be persisted.

    Returns
    -------
    A `SuccessTuple` indicating success.
    """
    query = {
        'connector_keys': str(pipe.connector_keys,
        'metric_key': str(pipe.metric_key),
        'location_key': str(pipe.location_key),
    }
    pipe_parameters = pipe._attributes.get('parameters', {})
    ### TODO Update the row with new parameters.
    # self.pipes_collection.update_one(query, {'$set': {'parameters': pipe_parameters}})
    return True, "Success"

delete_pipe()

Delete a pipe's registration from the pipes table.

def delete_pipe():
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def delete_pipe(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
        **kwargs: Any
    ) -> SuccessTuple:
    """
    Delete a pipe's registration from the `pipes` collection.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to be deleted.

    Returns
    -------
    A `SuccessTuple` indicating success.
    """
    drop_success, drop_message = pipe.drop(debug=debug)
    if not drop_success:
        return drop_success, drop_message

    pipe_id = self.get_pipe_id(pipe, debug=debug)
    if pipe_id is None:
        return False, f"{pipe} is not registered."

    ### TODO Delete the pipe's row from the pipes table.
    # self.pipes_collection.delete_one({'_id': pipe_id})
    return True, "Success"

fetch_pipes_keys()

Return a list of tuples for the registered pipes' keys according to the provided filters.

Each filter should only be applied if the given list is not empty. Values within filters are joined by OR, and filters are joined by AND.

The function separate_negation_values() returns two sublists: regular values (IN) and values preceded by an underscore (NOT IN).

def fetch_pipes_keys():
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def fetch_pipes_keys(
        self,
        connector_keys: Optional[List[str]] = None,
        metric_keys: Optional[List[str]] = None,
        location_keys: Optional[List[str]] = None,
        tags: Optional[List[str]] = None,
        debug: bool = False,
        **kwargs: Any
    ) -> List[Tuple[str, str, str]]:
    """
    Return a list of tuples for the registered pipes' keys according to the provided filters.

    Parameters
    ----------
    connector_keys: Optional[List[str]], default None
        The keys passed via `-c`.

    metric_keys: Optional[List[str]], default None
        The keys passed via `-m`.

    location_keys: Optional[List[str]], default None
        The keys passed via `-l`.

    tags: Optional[List[str]], default None
        Tags passed via `--tags` which are stored under `parameters:tags`.

    Returns
    -------
    A list of connector, metric, and location keys in tuples.
    You may return the string "None" for location keys in place of nulls.

    Examples
    --------
    >>> import meerschaum as mrsm
    >>> conn = mrsm.get_connector('example:demo')
    >>> 
    >>> pipe_a = mrsm.Pipe('a', 'demo', tags=['foo'], instance=conn)
    >>> pipe_b = mrsm.Pipe('b', 'demo', tags=['bar'], instance=conn)
    >>> pipe_a.register()
    >>> pipe_b.register()
    >>> 
    >>> conn.fetch_pipes_keys(['a', 'b'])
    [('a', 'demo', 'None'), ('b', 'demo', 'None')]
    >>> conn.fetch_pipes_keys(metric_keys=['demo'])
    [('a', 'demo', 'None'), ('b', 'demo', 'None')]
    >>> conn.fetch_pipes_keys(tags=['foo'])
    [('a', 'demo', 'None')]
    >>> conn.fetch_pipes_keys(location_keys=[None])
    [('a', 'demo', 'None'), ('b', 'demo', 'None')]
    """
    from meerschaum.utils.misc import separate_negation_values

    in_ck, nin_ck = separate_negation_values([str(val) for val in (connector_keys or [])])
    in_mk, nin_mk = separate_negation_values([str(val) for val in (metric_keys or [])])
    in_lk, nin_lk = separate_negation_values([str(val) for val in (location_keys or [])])
    in_tags, nin_tags = separate_negation_values([str(val) for val in (tags or [])])

    ### TODO build a query like so, only including clauses if the given list is not empty.
    ### The `tags` clause is an OR ("?|"), meaning any of the tags may match.
    ### 
    ### 
    ### SELECT connector_keys, metric_key, location_key
    ### FROM pipes
    ### WHERE connector_keys IN ({in_ck})
    ###   AND connector_keys NOT IN ({nin_ck})
    ###   AND metric_key IN ({in_mk})
    ###   AND metric_key NOT IN ({nin_mk})
    ###   AND location_key IN (in_lk)
    ###   AND location_key NOT IN (nin_lk)
    ###   AND (parameters->'tags')::JSONB ?| ARRAY[{tags}]
    ###   AND NOT (parameters->'tags')::JSONB ?| ARRAY[{nin_tags}]
    return []

pipe_exists()

Return True if the target table exists and has data.

def pipe_exists():
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def pipe_exists(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
        **kwargs: Any
    ) -> bool:
    """
    Check whether a pipe's target table exists.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to check whether its table exists.

    Returns
    -------
    A `bool` indicating the table exists.
    """
    table_name = pipe.target
    ### TODO write a query to determine the existence of `table_name`.
    table_exists = False
    return table_exists

drop_pipe()

Drop the pipe's target table.

def drop_pipe():
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def drop_pipe(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
        **kwargs: Any
    ) -> SuccessTuple:
    """
    Drop a pipe's collection if it exists.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to be dropped.

    Returns
    -------
    A `SuccessTuple` indicating success.
    """
    if not pipe.exists(debug=debug):
        return True, "Success"

    ### TODO write a query to drop `table_name`.
    table_name = pipe.target
    return True, "Success"

sync_pipe()

Upsert new data into the pipe's table.

You may use the built-in method pipe.filter_existing() to extract inserts and updates in case the database for this connector does not have upsert functionality.

The values of the pipe.columns dictionary are immutable indices to be used for upserts. You may improve performance by indexing these columns after an initial sync (i.e. pipe.exists() is False).

def sync_pipe():
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def sync_pipe(
        self,
        pipe: mrsm.Pipe,
        df: 'pd.DataFrame' = None,
        debug: bool = False,
        **kwargs: Any
    ) -> SuccessTuple:
    """
    Upsert new documents into the pipe's collection.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe whose collection should receive the new documents.

    df: Union['pd.DataFrame', Iterator['pd.DataFrame']], default None
        The data to be synced.

    Returns
    -------
    A `SuccessTuple` indicating success.
    """
    if df is None:
        return False, f"Received `None`, cannot sync {pipe}."

    ### TODO Write the upsert logic for the target table.
    ### `pipe.filter_existing()` is provided for your convenience to
    ### remove duplicates and separate inserts from updates.
    unseen_df, update_df, delta_df = pipe.filter_existing(df, debug=debug)
    return True, "Success"

sync_pipe_inplace() (optional)

For situations where the source and instance connectors are the same, the method sync_pipe_inplace() allows you to bypass loading DataFrames into RAM and instead handle the syncs remotely. See the SQLConnector.sync_pipe_inplace() method for reference.

clear_pipe()

Delete a pipe's data within a bounded or unbounded interval without dropping the table:

def clear_pipe():
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def clear_pipe(
        self,
        pipe: mrsm.Pipe,
        begin: Union[datetime, int, None] = None,
        end: Union[datetime, int, None] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
    ) -> SuccessTuple:
    """
    Delete rows within `begin`, `end`, and `params`.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe whose rows to clear.

    begin: Union[datetime, int, None], default None
        If provided, remove rows >= `begin`.

    end: Union[datetime, int, None], default None
        If provided, remove rows < `end`.

    params: Optional[Dict[str, Any]], default None
        If provided, only remove rows which match the `params` filter.

    Returns
    -------
    A `SuccessTuple` indicating success.
    """
    ### TODO Write a query to remove rows which match `begin`, `end`, and `params`.
    return True, "Success"

deduplicate_pipe() (optional)

Like sync_pipe_inplace(), you may choose to implement deduplicate_pipe() for a performance boost. Otherwise, the default implementation relies upon get_pipe_data(), clear_pipe(), and get_pipe_rowcount(). See the SQLConnector.deduplicate_pipe() method for reference.

get_pipe_data()

Return the target table's data according to the filters.

The begin and end arguments correspond to the designated datetime axis (pipe.columns['datetime']).

The params argument behaves the same as fetch_pipes_keys() filters but may allow single values as well. See the disclaimer at the top of this page on building queries with params.

The convenience function parse_df_datetimes() casts dataframe-like lists of dictionaries (or dictionaries of lists) into DataFrames, automatically casting ISO strings to datetimes.

def get_pipe_data():
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def get_pipe_data(
        self,
        pipe: mrsm.Pipe,
        select_columns: Optional[List[str]] = None,
        omit_columns: Optional[List[str]] = None,
        begin: Union[datetime, int, None] = None,
        end: Union[datetime, int, None] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kwargs: Any
    ) -> Union['pd.DataFrame', None]:
    """
    Query a pipe's target table and return the DataFrame.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe with the target table from which to read.

    select_columns: Optional[List[str]], default None
        If provided, only select these given columns.
        Otherwise select all available columns (i.e. `SELECT *`).

    omit_columns: Optional[List[str]], default None
        If provided, remove these columns from the selection.

    begin: Union[datetime, int, None], default None
        The earliest `datetime` value to search from (inclusive).

    end: Union[datetime, int, None], default None
        The lastest `datetime` value to search from (exclusive).

    params: Optional[Dict[str, str]], default None
        Additional filters to apply to the query.

    Returns
    -------
    The target table's data as a DataFrame.
    """
    if not pipe.exists(debug=debug):
        return None

    table_name = pipe.target
    dt_col = pipe.columns.get("datetime", None)

    ### TODO Write a query to fetch from `table_name`
    ###      and apply the filters `begin`, `end`, and `params`.
    ### 
    ###      To improve performance, add logic to only read from
    ###      `select_columns` and not `omit_columns` (if provided).
    ### 
    ### SELECT {', '.join(cols_to_select)}
    ### FROM "{table_name}"
    ### WHERE "{dt_col}" >= '{begin}'
    ###   AND "{dt_col}" <  '{end}'

    ### The function `parse_df_datetimes()` is a convenience function
    ### to cast a list of dictionaries into a DataFrame and convert datetime columns.
    from meerschaum.utils.misc import parse_df_datetimes
    rows = []
    return parse_df_datetimes(rows)

get_sync_time()

Return the largest (or smallest) value in target table, according to the params filter.

def get_sync_time():
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def get_sync_time(
        self,
        pipe: mrsm.Pipe,
        params: Optional[Dict[str, Any]] = None,
        newest: bool = True,
        debug: bool = False,
        **kwargs: Any
    ) -> Union[datetime, int, None]:
    """
    Return the most recent value for the `datetime` axis.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe whose collection contains documents.

    params: Optional[Dict[str, Any]], default None
        Filter certain parameters when determining the sync time.

    newest: bool, default True
        If `True`, return the maximum value for the column.

    Returns
    -------
    The largest `datetime` or `int` value of the `datetime` axis. 
    """
    dt_col = pipe.columns.get('dt_col', None)
    if dt_col is None:
        return None

    ### TODO write a query to get the largest value for `dt_col`.
    ### If `newest` is `False`, return the smallest value.
    ### Apply the `params` filter in case of multiplexing.

get_pipe_columns_types()

Return columns and Pandas data types (you may also return PosgreSQL-style types). You may take advantage of automatic dtype enforcement by implementing this method.

Example
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def get_pipe_columns_types(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
        **kwargs: Any
    ) -> Dict[str, str]:
    """
    Return the data types for the columns in the target table for data type enforcement.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe whose target table contains columns and data types.

    Returns
    -------
    A dictionary mapping columns to data types.
    """
    if not pipe.exists(debug=debug):
        return {}

    table_name = pipe.target
    ### TODO write a query to fetch the columns contained in `table_name`.
    columns_types = {}

    ### Return a dictionary mapping the columns
    ### to their Pandas dtypes, e.g.:
    ### `{'foo': 'int64'`}`
    return columns_types

get_pipe_rowcount()

Return the number of rows in the pipe's target table within the begin, end, and params bounds:

def get_pipe_rowcount():
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def get_pipe_rowcount(
        self,
        pipe: mrsm.Pipe,
        begin: Union[datetime, int, None] = None,
        end: Union[datetime, int, None] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kwargs: Any
    ) -> int:
    """
    Return the rowcount for the pipe's table.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe whose table should be counted.

    begin: Union[datetime, int, None], default None
        If provided, only count rows >= `begin`.

    end: Union[datetime, int, None], default None
        If provided, only count rows < `end`.

    params: Optional[Dict[str, Any]]
        If provided, only count rows that match the `params` filter.

    Returns
    -------
    The rowcount for this pipe's table according the given parameters.
    """
    ### TODO write a query to count how many rows exist in `table_name` according to the filters.
    table_name = pipe.target
    count = 0
    return count