-
Notifications
You must be signed in to change notification settings - Fork 0
/
hydra_minirocket_classifier.py
161 lines (126 loc) · 4.93 KB
/
hydra_minirocket_classifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import numpy as np
import torch
from sklearn.linear_model import RidgeClassifierCV
from sklearn.metrics import accuracy_score, make_scorer
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sktime.classification import BaseClassifier
from sktime.transformations.panel.rocket import Rocket
from hydra_multivariate import HydraMultivariate
from rocket import MiniRocketTransformer
class HydraMultivariateClassifier(BaseClassifier):
_tags = {
"X_inner_mtype": "numpy3D", # which type do _fit/_predict, support for X?
# it should be either "numpy3D" or "nested_univ" (nested pd.DataFrame)
"capability:multivariate": True
}
def __init__(self, k = 8, g = 64, clf=None, batch_size = 1024, verbose=0):
"""
Parameters
----------
hydra_params : dict
Parameters for HydraMultivariate.
clf : sklearn classifier
Classifier to use. If None, RidgeClassifierCV is used.
verbose : int
Verbosity level.
"""
super().__init__()
self.clf = clf
self.k = k
self.g = g
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.verbose = verbose
self.batch_size = batch_size
def _fit(self, X, y):
if type(X) is not torch.Tensor:
X = torch.from_numpy(X)
X = X.float().to(self.device)
if self.clf is None:
scorer = make_scorer(accuracy_score, greater_is_better=True)
self.clf = RidgeClassifierCV(alphas=np.logspace(-3, 3, 10), scoring=scorer, cv=10)
self.pipeline = make_pipeline(
HydraMultivariate(X.shape[-1], X.shape[1], k = self.k, g = self.g, batch_size=self.batch_size).to(self.device),
StandardScaler(with_mean=False),
self.clf
)
if self.verbose > 0:
print(self.pipeline)
self.pipeline.fit(X, y)
return self
def _predict(self, X):
if type(X) is not torch.Tensor:
X = torch.from_numpy(X)
X = X.to(self.device)
return self.pipeline.predict(X)
class MiniRocketMultivariateClassifier(BaseClassifier):
_tags = {
"X_inner_mtype": "numpy3D", # which type do _fit/_predict, support for X?
# it should be either "numpy3D" or "nested_univ" (nested pd.DataFrame)
"capability:multivariate": True
}
def __init__(self, num_features=10_000, clf=None, chunksize = 1024, verbose=0):
"""
Parameters
----------
mr_params : dict
Parameters for MiniRocket.
clf : sklearn classifier
Classifier to use. If None, RidgeClassifierCV is used.
verbose : int
Verbosity level.
"""
super().__init__()
self.clf = clf
self.num_features = num_features
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.chunksize = chunksize
self.verbose = verbose
def _fit(self, X, y):
if type(X) is not torch.Tensor:
X = torch.from_numpy(X)
X = X.float().to(self.device)
if self.clf is None:
scorer = make_scorer(accuracy_score, greater_is_better=True)
self.clf = RidgeClassifierCV(alphas=np.logspace(-3, 3, 10), scoring=scorer, cv=10)
self.pipeline = make_pipeline(
MiniRocketTransformer(X.shape[1], X.shape[-1], num_features=self.num_features, chunksize=self.chunksize).to(self.device),
StandardScaler(with_mean=False),
self.clf
)
if self.verbose > 0:
print(self.pipeline)
self.pipeline.fit(X, y)
return self
def _predict(self, X):
if type(X) is not torch.Tensor:
X = torch.from_numpy(X)
X = X.float().to(self.device)
return self.pipeline.predict(X)
class RocketMultivariateClassifier(BaseClassifier):
_tags = {
"X_inner_mtype": "numpy3D", # which type do _fit/_predict, support for X?
# it should be either "numpy3D" or "nested_univ" (nested pd.DataFrame)
"capability:multivariate": True
}
def __init__(self, num_features=10_000, clf=None, chunksize = 1024, verbose=0):
super().__init__()
self.clf = clf
self.num_features = num_features
self.chunksize = chunksize
self.verbose = verbose
def _fit(self, X, y):
if self.clf is None:
scorer = make_scorer(accuracy_score, greater_is_better=True)
self.clf = RidgeClassifierCV(alphas=np.logspace(-3, 3, 10), scoring=scorer, cv=10)
self.pipeline = make_pipeline(
Rocket(num_kernels=self.num_features, n_jobs=5),
StandardScaler(with_mean=False),
self.clf
)
if self.verbose > 0:
print(self.pipeline)
self.pipeline.fit(X, y)
return self
def _predict(self, X):
return self.pipeline.predict(X)