Getting Started
Requirements
Ferdelance is a library written with Python 3.10
.
Installation
To install a node or a workbench, just install the whole library:
pip install ferdelance
Example
# %%
from ferdelance.core.distributions import Collect
from ferdelance.core.estimators import GroupCountEstimator, MeanEstimator
from ferdelance.core.model_operations import Aggregation, Train, TrainTest
from ferdelance.core.models import FederatedRandomForestClassifier, StrategyRandomForestClassifier
from ferdelance.core.steps import Finalize, Parallel
from ferdelance.core.transformers import FederatedSplitter, FederatedKBinsDiscretizer
from ferdelance.schemas.workbench import WorkbenchResource
from ferdelance.workbench import (
Context,
Project,
Client,
Artifact,
ArtifactStatus,
DataSource,
)
import numpy as np
import json
# %% create the context
ctx = Context("http://localhost:1456")
# %% load a project given a token
project_token = "58981bcbab77ef4b8e01207134c38873e0936a9ab88cd76b243a2e2c85390b94"
project: Project = ctx.project(project_token)
# %% What is this project?
print(project)
# %% (for DEBUG) ask the context for clients of a project
clients: list[Client] = ctx.clients(project)
for c in clients:
print(c)
# %% (for DEBUG) ask the context for data source of a project
datasources: list[DataSource] = ctx.datasources(project)
for datasource in datasources:
print(datasource) # <--- non aggregated
# %% working with data
ds = project.data # <--- AggregatedDataSource
print(ds)
# this is like a describe, but per single feature
for feature in ds.features:
print(feature)
# %% develop a filter query
# prepare transformation query with all features
q = project.extract()
# inspect a feature data type
feature = q["variety"]
print(feature)
# add filter to the extraction query
q = q.add(q["variety"] < 2)
# %% add transformer
q = q.add(
FederatedKBinsDiscretizer(
features_in=[q["variety"]],
features_out=[q["variety_discr"]],
)
)
# %% statistics 1
gc = GroupCountEstimator(
query=q,
by=["variety_discr"],
features=["variety_discr"],
)
ret = ctx.submit(project, gc.get_steps())
# %% statistics 2
me = MeanEstimator(query=q)
ret = ctx.submit(project, me.get_steps())
# %% prepare the model steps
model = FederatedRandomForestClassifier(
n_estimators=10,
strategy=StrategyRandomForestClassifier.MERGE,
)
label = "MedHouseValDiscrete"
steps = [
Parallel(
TrainTest(
query=project.extract().add(
FederatedSplitter(
random_state=42,
test_percentage=0.2,
label=label,
)
),
trainer=Train(model=model),
model=model,
),
Collect(),
),
Finalize(
Aggregation(model=model),
),
]
# %% submit the task to the node, it will be converted to an Artifact
a: Artifact = ctx.submit(project, steps)
print(json.dumps(a.model_dump(), indent=True)) # view execution plan
# %% monitor learning progress
status: ArtifactStatus = ctx.status(a)
print(status)
# %% list produced resources
resources: list[WorkbenchResource] = ctx.list_resources(a)
for r in resources:
print(r.resource_id, r.creation_time, r.is_ready)
# %% get latest produced resource (the result)
agg_model: FederatedRandomForestClassifier = ctx.get_latest_resource(a)["model"]
# %%
print(agg_model.predict(np.array([[0, 0, 0, 0]])))
print(agg_model.predict(np.array([[1, 1, 1, 1]])))