Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Implicit plugins #900

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion js/core/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import path from 'path';
import { FlowStateStore } from './flowTypes.js';
import { LocalFileFlowStateStore } from './localFileFlowStateStore.js';
import { logger } from './logging.js';
import { PluginProvider } from './plugin.js';
import { PluginProvider, PluginProvidesType } from './plugin.js';
import * as registry from './registry.js';
import { AsyncProvider } from './registry.js';
import {
Expand Down Expand Up @@ -111,6 +111,9 @@ class Config {
logger.info(`Initializing plugin ${plugin.name}:`);
return await plugin.initializer();
},
provides() {
return plugin.provides();
},
});
});

Expand All @@ -120,6 +123,23 @@ class Config {
logger.debug(` - all environments: ${loggerPluginName}`);
this.loggerConfig = async () =>
this.resolveLoggerConfig(loggerPluginName);
} else {
const capablePlugins = registry.lookupPluginsByAbility(
PluginProvidesType.TELEMETRY
);
if (capablePlugins) {
if (capablePlugins.length != 1) {
logger.error(`More than one plugin implicitly provides telemetry.`);
logger.error(
`Disambiguate by adding a telemetry {} block to your configuration and specifying one of ${capablePlugins.map((plugin) => plugin.name)}.`
);
} else {
logger.debug('Registering logging exporters...');
logger.debug(` - all environments: ${capablePlugins[0].name}`);
this.loggerConfig = async () =>
this.resolveLoggerConfig(capablePlugins[0].name);
}
}
}

if (this.options.telemetry?.instrumentation) {
Expand All @@ -128,6 +148,23 @@ class Config {
logger.debug(` - all environments: ${telemetryPluginName}`);
this.telemetryConfig = async () =>
this.resolveTelemetryConfig(telemetryPluginName);
} else {
const capablePlugins = registry.lookupPluginsByAbility(
PluginProvidesType.TELEMETRY
);
if (capablePlugins) {
if (capablePlugins.length != 1) {
logger.error(`More than one plugin implicitly provides telemetry.`);
logger.error(
`Disambiguate by adding a telemetry {} block to your configuration and specifying one of ${capablePlugins.map((plugin) => plugin.name)}.`
);
} else {
logger.debug('Registering telemetry exporters...');
logger.debug(` - all environments: ${capablePlugins[0].name}`);
this.telemetryConfig = async () =>
this.resolveTelemetryConfig(capablePlugins[0].name);
}
}
}

logger.debug('Registering flow state stores...');
Expand Down
17 changes: 16 additions & 1 deletion js/core/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,25 @@ export interface Provider<T> {
value: T;
}

export enum PluginProvidesType {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, this is wrong, will fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to work as a bitmask

UNSPECIFIED = 0x0,
MODEL = 0x1,
RETRIEVER = 0x2,
EMBEDDER = 0x3,
INDEXER = 0x4,
EVALUATOR = 0x5,
FLOW_STATE_STORE = 0x6,
TRACE_STORE = 0x7,
TELEMETRY = 0x8,
}

export interface PluginProvider {
name: string;
initializer: () =>
| InitializedPlugin
| void
| Promise<InitializedPlugin | void>;
provides: () => PluginProvidesType;
}

export interface InitializedPlugin {
Expand Down Expand Up @@ -58,7 +71,8 @@ export type Plugin<T extends any[]> = (...args: T) => PluginProvider;
*/
export function genkitPlugin<T extends PluginInit>(
pluginName: string,
initFn: T
initFn: T,
providesFn: () => PluginProvidesType = () => PluginProvidesType.UNSPECIFIED
): Plugin<Parameters<T>> {
return (...args: Parameters<T>) => ({
name: pluginName,
Expand All @@ -67,6 +81,7 @@ export function genkitPlugin<T extends PluginInit>(
validatePluginActions(pluginName, initializedPlugin);
return initializedPlugin;
},
provides: () => providesFn(),
});
}

Expand Down
26 changes: 24 additions & 2 deletions js/core/src/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import * as z from 'zod';
import { Action } from './action.js';
import { FlowStateStore } from './flowTypes.js';
import { logger } from './logging.js';
import { PluginProvider } from './plugin.js';
import { PluginProvider, PluginProvidesType } from './plugin.js';
import { startReflectionApi } from './reflectionApi.js';
import { JSONSchema } from './schema.js';
import { TraceStore } from './tracing/types.js';
Expand All @@ -29,6 +29,7 @@ const ACTIONS_BY_ID = 'genkit__ACTIONS_BY_ID';
const TRACE_STORES_BY_ENV = 'genkit__TRACE_STORES_BY_ENV';
const FLOW_STATE_STORES_BY_ENV = 'genkit__FLOW_STATE_STORES_BY_ENV';
const PLUGINS_BY_NAME = 'genkit__PLUGINS_BY_NAME';
const PLUGINS_BY_ABILITY = 'genkit__PLUGINS_BY_ABILITY';
const SCHEMAS_BY_NAME = 'genkit__SCHEMAS_BY_NAME';

function actionsById(): Record<string, Action<z.ZodTypeAny, z.ZodTypeAny>> {
Expand All @@ -55,6 +56,12 @@ function pluginsByName(): Record<string, PluginProvider> {
}
return global[PLUGINS_BY_NAME];
}
function pluginsByAbility(): Record<PluginProvidesType, PluginProvider[]> {
if (global[PLUGINS_BY_ABILITY] === undefined) {
global[PLUGINS_BY_ABILITY] = {};
}
return global[PLUGINS_BY_ABILITY];
}
function schemasByName(): Record<
string,
{ schema?: z.ZodTypeAny; jsonSchema?: JSONSchema }
Expand Down Expand Up @@ -196,7 +203,8 @@ export async function lookupFlowStateStore(
*/
export function registerPluginProvider(name: string, provider: PluginProvider) {
let cached;
pluginsByName()[name] = {

const plugin = {
name: provider.name,
initializer: () => {
if (cached) {
Expand All @@ -205,13 +213,27 @@ export function registerPluginProvider(name: string, provider: PluginProvider) {
cached = provider.initializer();
return cached;
},
provides: () => {
return provider.provides();
},
};

pluginsByName()[plugin.name] = plugin;

if (plugin.provides() !== PluginProvidesType.UNSPECIFIED) {
const plugins = pluginsByAbility()[plugin.provides()] || [];
pluginsByAbility()[plugin.provides()] = [...plugins, plugin];
}
}

export function lookupPlugin(name: string) {
return pluginsByName()[name];
}

export function lookupPluginsByAbility(ability: PluginProvidesType) {
return pluginsByAbility()[ability];
}

/**
*
*/
Expand Down
9 changes: 8 additions & 1 deletion js/plugins/google-cloud/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import { genkitPlugin, Plugin } from '@genkit-ai/core';
import { genkitPlugin, Plugin, PluginProvidesType } from '@genkit-ai/core';
import { credentialsFromEnvironment } from './auth.js';
import { GcpLogger } from './gcpLogger.js';
import { GcpOpenTelemetry } from './gcpOpenTelemetry.js';
Expand All @@ -29,6 +29,13 @@ export const googleCloud: Plugin<[GcpPluginOptions] | []> = genkitPlugin(
async (options?: GcpPluginOptions) => build(options)
);

export const googleCloudWithTelemetry: Plugin<[GcpPluginOptions] | []> =
genkitPlugin(
'googleCloudWithTelemetry',
async (options?: GcpPluginOptions) => build(options),
() => PluginProvidesType.TELEMETRY
);

/**
* Configures and builds the plugin.
* Not normally needed, but exposed for use by the firebase plugin.
Expand Down
2 changes: 1 addition & 1 deletion js/plugins/google-cloud/src/telemetry/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class FlowsTelemetry implements Telemetry {
}

if (output && logIO) {
this.recordIO(span, 'Output', name, path, input, projectId);
this.recordIO(span, 'Output', name, path, output, projectId);
}

if (state === 'success') {
Expand Down
39 changes: 19 additions & 20 deletions js/testapps/flow-simple-ai/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { configureGenkit } from '@genkit-ai/core';
import { dotprompt, prompt } from '@genkit-ai/dotprompt';
import { defineFirestoreRetriever, firebase } from '@genkit-ai/firebase';
import { defineFlow, run } from '@genkit-ai/flow';
import { googleCloud } from '@genkit-ai/google-cloud';
import { googleCloudWithTelemetry } from '@genkit-ai/google-cloud';
import {
gemini15Flash,
googleAI,
Expand All @@ -32,7 +32,6 @@ import {
textEmbeddingGecko,
vertexAI,
} from '@genkit-ai/vertexai';
import { AlwaysOnSampler } from '@opentelemetry/sdk-trace-base';
import { initializeApp } from 'firebase-admin/app';
import { getFirestore } from 'firebase-admin/firestore';
import { Allow, parse } from 'partial-json';
Expand All @@ -43,33 +42,33 @@ configureGenkit({
firebase(),
googleAI(),
vertexAI(),
googleCloud({
googleCloudWithTelemetry({
// These are configured for demonstration purposes. Sensible defaults are
// in place in the event that telemetryConfig is absent.
telemetryConfig: {
// Forces telemetry export in 'dev'
forceDevExport: true,
sampler: new AlwaysOnSampler(),
autoInstrumentation: true,
autoInstrumentationConfig: {
'@opentelemetry/instrumentation-fs': { enabled: false },
'@opentelemetry/instrumentation-dns': { enabled: false },
'@opentelemetry/instrumentation-net': { enabled: false },
},
metricExportIntervalMillis: 5_000,
metricExportTimeoutMillis: 5_000,
},
// telemetryConfig: {
// // Forces telemetry export in 'dev'
// forceDevExport: true,
// sampler: new AlwaysOnSampler(),
// autoInstrumentation: true,
// autoInstrumentationConfig: {
// '@opentelemetry/instrumentation-fs': { enabled: false },
// '@opentelemetry/instrumentation-dns': { enabled: false },
// '@opentelemetry/instrumentation-net': { enabled: false },
// },
// metricExportIntervalMillis: 5_000,
// metricExportTimeoutMillis: 5_000,
// },
}),
dotprompt(),
],
flowStateStore: 'firebase',
traceStore: 'firebase',
enableTracingAndMetrics: true,
logLevel: 'debug',
telemetry: {
instrumentation: 'googleCloud',
logger: 'googleCloud',
},
// telemetry: {
// instrumentation: 'googleCloud',
// logger: 'googleCloud',
// },
});

const app = initializeApp();
Expand Down
Loading