Writing Plugins
The Steampipe Plugin SDK makes writing tables fast, easy, and fun! Most of the heavy lifting is taken care of for you — just define your tables and columns, wire up a few API calls, and you can start to query your service with standard SQL!
While this document will provide an introduction and some examples, note that Steampipe is an evolving, open source project - refer to the code as the authoritative source, as well as for real-world examples.
Also, please try to be a good community citizen — following the standards makes for a better, more consistent experience for end-users and developers alike.
Let's get started!
- The Basics
- Implementing Tables
- Hydrate Functions
- Client-Side Rate Limiting
- Function Tags
- Accounting for Paged List calls
- Logging
- Installing and Testing Your Plugin
The Basics
main.go
The main
function in then main.go
is the entry point for your plugin. This function must call plugin.Serve
from the plugin sdk to instantiate your plugin gRPC server. You will pass the plugin function that you will create in the plugin.go file:
Example: main.go
package mainimport ("github.com/turbot/steampipe-plugin-sdk/v5/plugin""github.com/turbot/steampipe-plugin-zendesk/zendesk")func main() {plugin.Serve(&plugin.ServeOpts{PluginFunc: zendesk.Plugin})}
plugin.go
The plugin.go
file should implement a single Plugin Definition (Plugin()
function) that returns a pointer to a Plugin
to be loaded by the gRPC server.
By convention, the package name for your plugin should be the same name as your plugin, and go files for your plugin (except main.go
) should reside in a folder with the same name.
Example: plugin.go
package zendeskimport ("context""github.com/turbot/steampipe-plugin-sdk/plugin""github.com/turbot/steampipe-plugin-sdk/plugin/transform")func Plugin(ctx context.Context) *plugin.Plugin {p := &plugin.Plugin{Name: "steampipe-plugin-zendesk",DefaultTransform: transform.FromGo().NullIfZero(),TableMap: map[string]*plugin.Table{"zendesk_brand": tableZendeskBrand(),"zendesk_group": tableZendeskGroup(),"zendesk_organization": tableZendeskOrganization(),"zendesk_search": tableZendeskSearch(),"zendesk_ticket": tableZendeskTicket(),"zendesk_ticket_audit": tableZendeskTicketAudit(),"zendesk_trigger": tableZendeskTrigger(),"zendesk_user": tableZendeskUser(),},}return p}
Plugin Definition
Argument | Description |
---|---|
Name | The name of the plugin (steampipe-plugin-{plugin name} ). |
TableMap | A map of table names to Table definitions. |
DefaultTransform | A default Transform Function to be used when one is not specified. While not required, this may save quite a bit of repeated code. |
DefaultGetConfig | Provides an optional mechanism for providing plugin-level defaults to a get config. This is merged with the GetConfig defined in the table and/or columns. Typically, this is used to standardize error handling with ShouldIgnoreError . |
SchemaMode | Specifies if the schema should be checked and re-imported if changed every time Steampipe starts. This can be set to dynamic or static . Defaults to static . |
RequiredColumns | An optional list of columns that ALL tables in this plugin MUST implement. |
Table Definition
The plugin.Table
may specify:
Argument | Description |
---|---|
Name | The name of the table. |
Description | A short description, added as a comment on the table and used in help commands and documentation. |
Columns | An array of column definitions. |
List | A List Config definition, used to fetch the data items used to build all rows of a table. |
Get | A Get Config definition, used to fetch a single item. |
DefaultTransform | A default transform function to be used when one is not specified. If set, this will override the default set in the plugin definition. |
HydrateDependencies | Definitions of dependencies between hydrate functions (for cases where a hydrate function needs the results of another hydrate function). |
List Config
A ListConfig definition defines how to list all rows of a table.
Argument | Description |
---|---|
KeyColumns | An optional list of columns that require a qualifier in order to list data for this table. |
Hydrate | A hydrate function which is called first when performing a 'list' call. |
ParentHydrate | An optional parent list function - if you list items with a parent-child relationship, this will list the parent items. |
Get Config
A GetConfig definition defines how to get a single row of a table.
Argument | Description |
---|---|
KeyColumns | A list of keys which are used to uniquely identify rows - used to determine whether a query is a 'get' call. |
ItemFromKey [DEPRECATED] ] | This property is deprecated. |
Hydrate | A hydrate function which is called first when performing a 'get' call. If this returns 'not found', no further hydrate functions are called. |
ShouldIgnoreError | A function which will return whether to ignore a given error. |
Column Definition
A column definition definition specifies the name and description of the column, its data type, and the functions to call to hydrate the column (if the list call does not) and transform it (if the default transformation is not sufficient).
Argument | Description |
---|---|
Name | The column name. |
Type | The data type for this column. |
Description | The column description, added as a comment and used in help commands and documentation. |
Hydrate | You can explicitly specify the hydrate function function to populate this column. This is only needed if neither the default hydrate functions nor the List function return data for this column. |
Default | An optional default column value. |
Transform | An optional chain of transform functions to generate the column value. |
Column Data Types
Currently supported data types are:
Name | Type |
---|---|
ColumnType_BOOL | Boolean |
ColumnType_INT | Integer |
ColumnType_DOUBLE | Double precision floating point |
ColumnType_STRING | String |
ColumnType_JSON | JSON |
ColumnType_DATETIME | Date/Time (Deprecated - use ColumnType_TIMESTAMP) |
ColumnType_TIMESTAMP | Date/Time |
ColumnType_IPADDR | IP Address |
ColumnType_CIDR | IP network CIDR |
ColumnType_UNKNOWN | Unknown |
ColumnType_INET | Either an IP Address or an IP network CIDR |
ColumnType_LTREE | Ltree |
Implementing Tables
By convention, each table should be implemented in a separate file named table_{table name}.go
. Each table will have a single table definition function that returns a pointer to a plugin.Table
(this is the function specified in the TableMap
of the plugin definition). The function name is typically the table name in camel case (per golang standards) prefixed by table
.
The table definition specifies the name and description of the table, a list of column definitions, and the functions to call in order to list the data for all the rows, or to get data for a single row.
When a connection is created, Steampipe uses the table and column definitions to create the Postgres foreign tables, however the tables don't store the data — the data is populated (hydrated) when a query is run.
The basic flow is:
A user runs a steampipe query against the database
Postgres parses the query and sends the parsed request to the Steampipe FDW.
The Steampipe Foreign Data Wrapper (Steampipe FDW) determines what tables and columns are required.
The FDW calls the appropriate Hydrate Functions in the plugin, which fetch the appropriate data from the API, cloud provider, etc.
- Each table defines two special hydrate functions,
List
andGet
. TheList
orGet
will always be called before any other hydrate function in the table, as the other functions typically depend on the result of the Get or List call. - Whether
List
orGet
is called depends upon whether the qualifiers (inwhere
clauses andjoin...on
) match theKeyColumns
. This allows Steampipe to fetch only the "row" data that it needs. Qualifiers (aka quals) enable Steampipe to map a Postgres constraint (e.g.where created_at > date('2023-01-01')
) to the API parameter (e.g.since=1673992596000
) that the plugin's supporting SDK uses to fetch results matching the Postgres constraint. See How To enhance a plugin with a new table that supports 'quals' for a complete example. - Multiple columns may (and usually do) get built from the same hydrate function, but steampipe only calls the hydrate functions for the columns requested (specified in the
select
,join
, orwhere
). This allows steampipe to call only those APIs for the "column" data requested in the query.
- Each table defines two special hydrate functions,
The Transform Functions are called for each column. The transform functions extract and/or reformat data returned by the hydrate functions into the format to be returned in the column.
The plugin returns the transformed data to the Steampipe FDW
Steampipe FDW returns the results to the database
Hydrate Functions
A hydrate function connects to an external system or service and gathers data to fill a database table.
Get
and List
are hydrate functions, defined in the Table Definition, that have these special characteristics:
Every table must define a
List
and/orGet
function.The
List
orGet
will always be called before any other hydrate function in the table, as the other functions typically depend on the result of theGet
orList
call.Whether
List
orGet
is called depends upon whether the qualifiers (inwhere
clauses andjoin...on
) match theKeyColumns
defined in the Get Config. This enables Steampipe to fetch only the "row" data that it needs.Typically, hydrate functions return a single data item (data for a single row). List functions are an exception — they stream data for multiple rows using the QueryData object, and return
nil
.The
Get
function will usually get the key column data from theQueryData.KeyColumnQuals
so that it can get the appropriate item as based on the qualifiers (where
clause,join...on
). If theGet
hydrate function is used as both aGet
function AND a normal hydrate function, you should get the key column data from theHydrateData.Item
if it is not nil, and use theQueryData.KeyColumnQuals
otherwise.
About List Functions
A List
function retrieves all the items of a particular resource type from an API. For example, the github_my_gist table supports the query:
select*fromgithub_my_gist
The function tableGitHubMyGist
defines the table like so.
func tableGitHubMyGist() *plugin.Table {return &plugin.Table{Name: "github_my_gist",Description: "GitHub Gists owned by you. GitHub Gist is a simple way to share snippets and pastes with others.",List: &plugin.ListConfig{Hydrate: tableGitHubMyGistList,},Columns: gitHubGistColumns(),}}
The table's List
property refers, by way of the Hydrate
property, to a Steampipe function that lists gists, tableGitHubMyGistList. That function calls the GitHub Go SDK's GistsService.List and returns an array of pointers to items of type Gist
as defined in the Go SDK.
type Gist struct {ID *string `json:"id,omitempty"`Description *string `json:"description,omitempty"`Public *bool `json:"public,omitempty"`Owner *User `json:"owner,omitempty"`Files map[GistFilename]GistFile `json:"files,omitempty"`Comments *int `json:"comments,omitempty"`HTMLURL *string `json:"html_url,omitempty"`GitPullURL *string `json:"git_pull_url,omitempty"`GitPushURL *string `json:"git_push_url,omitempty"`CreatedAt *Timestamp `json:"created_at,omitempty"`UpdatedAt *Timestamp `json:"updated_at,omitempty"`NodeID *string `json:"node_id,omitempty"`}
The Columns
property in tableGitHubMyGist
refers to the function gitHubGistColumns
, which is shared with a related table table_github_gist, and which maps that Go schema to this database schema.
func gitHubGistColumns() []*plugin.Column {return []*plugin.Column{// Top columns{Name: "id", Type: proto.ColumnType_STRING, Description: "The unique id of the gist."},{Name: "description", Type: proto.ColumnType_STRING, Description: "The gist description."},{Name: "public", Type: proto.ColumnType_BOOL, Description: "If true, the gist is public, otherwise it is private."},{Name: "html_url", Type: proto.ColumnType_STRING, Description: "The HTML URL of the gist."},{Name: "comments", Type: proto.ColumnType_INT, Description: "The number of comments for the gist."},{Name: "created_at", Type: proto.ColumnType_TIMESTAMP, Transform: transform.FromField("CreatedAt").Transform(convertTimestamp), Description: "The timestamp when the gist was created."},{Name: "git_pull_url", Type: proto.ColumnType_STRING, Description: "The https url to pull or clone the gist."},{Name: "git_push_url", Type: proto.ColumnType_STRING, Description: "The https url to push the gist."},{Name: "node_id", Type: proto.ColumnType_STRING, Description: "The Node ID of the gist."},// Only load relevant fields from the owner{Name: "owner_id", Type: proto.ColumnType_INT, Description: "The user id (number) of the gist owner.", Transform: transform.FromField("Owner.ID")},{Name: "owner_login", Type: proto.ColumnType_STRING, Description: "The user login name of the gist owner.", Transform: transform.FromField("Owner.Login")},{Name: "owner_type", Type: proto.ColumnType_STRING, Description: "The type of the gist owner (User or Organization).", Transform: transform.FromField("Owner.Type")},{Name: "updated_at", Type: proto.ColumnType_TIMESTAMP, Transform: transform.FromField("UpdatedAt").Transform(convertTimestamp), Description: "The timestamp when the gist was last updated."},{Name: "files", Type: proto.ColumnType_JSON, Transform: transform.FromField("Files").Transform(gistFileMapToArray), Description: "Files in the gist."},}}
Here's the tableGitHubMyGistList function.
func tableGitHubMyGistList(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error) {client := connect(ctx, d)opt := &github.GistListOptions{ListOptions: github.ListOptions{PerPage: 100}}limit := d.QueryContext.Limit // the SQL LIMITif limit != nil {if *limit < int64(opt.ListOptions.PerPage) {opt.ListOptions.PerPage = int(*limit)}}for {gists, resp, err := client.Gists.List(ctx, "", opt) // call https://pkg.go.dev/github.com/google/go-github/v60/github#GistsService.Listif err != nil {return nil, err}for _, i := range gists {if i != nil {d.StreamListItem(ctx, i) // send the item to steampipe}// Context can be cancelled due to manual cancellation or the limit has been hitif d.RowsRemaining(ctx) == 0 {return nil, nil}}if resp.NextPage == 0 {break}opt.Page = resp.NextPage}return nil, nil}
A Steampipe List
function is one of two special forms of hydrate function — Get
is the other — that take precedence over other hydrate functions which are declared using the HydrateConfig
property of a table definition.
About Get Functions
A Get
function fetches a single item by its key. While it's possible to define a table that only uses Get
, the common pattern combines List
to retrieve basic data and Get
to enrich it. For example, here's the definition of the table github_gitignore.
func tableGitHubGitignore() *plugin.Table {return &plugin.Table{Name: "github_gitignore",Description: "GitHub defined .gitignore templates that you can associate with your repository.",List: &plugin.ListConfig{Hydrate: tableGitHubGitignoreList,},Get: &plugin.GetConfig{KeyColumns: plugin.SingleColumn("name"),ShouldIgnoreError: isNotFoundError([]string{"404"}),Hydrate: tableGitHubGitignoreGetData,},Columns: []*plugin.Column{// Top columns{Name: "name", Type: proto.ColumnType_STRING, Description: "Name of the gitignore template."},{Name: "source", Type: proto.ColumnType_STRING, Hydrate: tableGitHubGitignoreGetData, Description: "Source code of the gitignore template."},},}}
In this case, the source
column data is not included in the API response from the tableGitHubGitignoreList
function. So the tableGitHubGitignoreGetData
function is specified as the Hydrate
function for that column.
The List
function, tableGitHubGitignoreList, calls the SDK's GitignoresService.List which returns an array of strings which are the names of .gitignore templates.
The Get
function, tableGitHubGitignoreGetData, receives the name of a template and calls the SDK's GitignoresService.Get to return an item of type GitIgnore, which corresponds to the Columns
in the table definition.
type Gitignore struct {Name *string `json:"name,omitempty"`Source *string `json:"source,omitempty"`}
For example, here's the result for select * from github_gitignore where name = 'go'
.
+------+-------------------------------------------------------------------------------------------+| name | source |+------+-------------------------------------------------------------------------------------------+| Go | # If you prefer the allow list template instead of the deny list, see community template: || | # https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore || | # || | # Binaries for programs and plugins || | *.exe || | *.exe~ || | *.dll || | *.so || | *.dylib |+------+-------------------------------------------------------------------------------------------+
The List
function finds all the names of the templates provided by GitHub, and the Get
function adds the source
column.
When the column definition doesn't need to specify a Hydrate
When the underlying SDK functions for a List
and Get
both return complete information, the column definition doesn't need to specify a Hydrate
. For example, here's the definition for table_github_actions_artifact.
func tableGitHubActionsArtifact() *plugin.Table {return &plugin.Table{Name: "github_actions_artifact",Description: "Artifacts allow you to share data between jobs in a workflow and store data once that workflow has completed.",List: &plugin.ListConfig{KeyColumns: plugin.SingleColumn("repository_full_name"),ShouldIgnoreError: isNotFoundError([]string{"404"}),Hydrate: tableGitHubArtifactList,},Get: &plugin.GetConfig{KeyColumns: plugin.AllColumns([]string{"repository_full_name", "id"}),ShouldIgnoreError: isNotFoundError([]string{"404"}),Hydrate: tableGitHubArtifactGet,},Columns: []*plugin.Column{// Top columns{Name: "repository_full_name", Type: proto.ColumnType_STRING, Transform: transform.FromQual("repository_full_name"), Description: "Full name of the repository that contains the artifact."},{Name: "name", Type: proto.ColumnType_STRING, Description: "The name of the artifact."},{Name: "id", Type: proto.ColumnType_INT, Description: "Unique ID of the artifact."},{Name: "size_in_bytes", Type: proto.ColumnType_INT, Description: "Size of the artifact in bytes."},// Other columns{Name: "archive_download_url", Type: proto.ColumnType_STRING, Transform: transform.FromField("ArchiveDownloadURL"), Description: "Archive download URL for the artifact."},{Name: "created_at", Type: proto.ColumnType_TIMESTAMP, Transform: transform.FromField("CreatedAt").Transform(convertTimestamp), Description: "Time when the artifact was created."},{Name: "expired", Type: proto.ColumnType_BOOL, Description: "It defines whether the artifact is expires or not."},{Name: "expires_at", Type: proto.ColumnType_TIMESTAMP, Transform: transform.FromField("ExpiresAt").Transform(convertTimestamp), Description: "Time when the artifact expires."},{Name: "node_id", Type: proto.ColumnType_STRING, Description: "Node where GitHub stores this data internally."},},}}
The SDK's ListArtifacts returns an array of Artifact and its GetArtifact returns a single Artifact
object. As with tableGitHubGitignore
, these are separate APIs — wrapped by the Go SDK — to list basic info and get details artifacts. If the query's where
or join...on
specifies an id
, the plugin will use the optimal Get
function, otherwise the List
function, to call the corresponding APIs. Either way, the same API response matches the schema declared in Columns
.
type Artifact struct {ID *int64 `json:"id,omitempty"`NodeID *string `json:"node_id,omitempty"`Name *string `json:"name,omitempty"`SizeInBytes *int64 `json:"size_in_bytes,omitempty"`URL *string `json:"url,omitempty"`ArchiveDownloadURL *string `json:"archive_download_url,omitempty"`Expired *bool `json:"expired,omitempty"`CreatedAt *Timestamp `json:"created_at,omitempty"`UpdatedAt *Timestamp `json:"updated_at,omitempty"`ExpiresAt *Timestamp `json:"expires_at,omitempty"`WorkflowRun *ArtifactWorkflowRun `json:"workflow_run,omitempty"`}
When Steampipe calls List
vs Get
Which function is called when you query the github_actions_artifact
table? It depends! We can use diagnostic mode to explore. This query, which lists all the artifacts in a repo, uses the List
function tableGitHubArtifactList
.
STEAMPIPE_DIAGNOSTIC_LEVEL=all steampipe service start> select jsonb_pretty(_ctx) from github_actions_artifactwhere repository_full_name = 'turbot/steampipe-plugin-github'+----------------------------------------------------------------+| jsonb_pretty |+----------------------------------------------------------------+| { || "steampipe": { || "sdk_version": "5.8.0" || }, || "diagnostics": { || "calls": [ || { || "type": "list", || "scope_values": { || "table": "github_actions_artifact", || "connection": "github", || "function_name": "tableGitHubArtifactList" || }, || "function_name": "tableGitHubArtifactList", || "rate_limiters": [ || ], || "rate_limiter_delay_ms": 0 || } || ] || }, || "connection_name": "github" || } |
This query, which uses the qualifier id
, uses the Get
function tableGitHubArtifactGet
.
> select jsonb_pretty(_ctx) from github_actions_artifact where id = '1248325644' and repository_full_name = 'turbot/steampipe-plugin-github'+---------------------------------------------------------------+| jsonb_pretty |+---------------------------------------------------------------+| { || "steampipe": { || "sdk_version": "5.8.0" || }, || "diagnostics": { || "calls": [ || { || "type": "get", || "scope_values": { || "table": "github_actions_artifact", || "connection": "github", || "function_name": "tableGitHubArtifactGet" || }, || "function_name": "tableGitHubArtifactGet", || "rate_limiters": [ || ], || "rate_limiter_delay_ms": 0 || } || ] || }, || "connection_name": "github" || } |+---------------------------------------------------------------+
This works because id
is one of the KeyColumns
in the Get
property of the table definition. That enables the Steampipe plugin SDK to choose the more optimal tableGitHubArtifactGet
function when the id
is known and it isn't necessary to list all artifacts in order to retrieve just a single one.
List or Get in Combination with Hydrate
In addition to to the special List
and Get
hydrate functions, there's a class of general hydrate functions that enrich what's returned by List
or Get
. In table_aws_cloudtrail_trail.go
, getCloudTrailStatus is an example of this kind of function.
Steampipe knows it's a HydrateFunc
because the table definition declares it in the HydrateConfig property of the table definition.
HydrateConfig: []plugin.HydrateConfig{{Func: getCloudtrailTrailStatus,Tags: map[string]string{"service": "cloudtrail", "action": "GetTrailStatus"},},...},
A HydrateFunc
is typically used in combination with List
or Get
.
For example, the List
function for table_aws_fms_app_list.go
uses the SDK's NewListAppsListsPaginator to get basic info declared in the Columns
property of the table definition.
{Name: "list_name",Description: "The name of the applications list.",Type: proto.ColumnType_STRING,Transform: transform.FromField("ListName", "AppsList.ListName"),},{Name: "list_id",Description: "The ID of the applications list.",Type: proto.ColumnType_STRING,Transform: transform.FromField("ListId", "AppsList.ListId"),},{Name: "arn",Description: "The Amazon Resource Name (ARN) of the applications list.",Type: proto.ColumnType_STRING,Transform: transform.FromField("ListArn", "AppsListArn"),},
These correspond to the type AppsListDataSummary in the AWS SDK.
The Columns
property also declares four other columns that use the HydrateFunc
called getFMSAppList.
{Name: "create_time",Description: "The time that the Firewall Manager applications list was created.",Type: proto.ColumnType_TIMESTAMP,Hydrate: getFmsAppList,},{Name: "last_update_time",Description: "The time that the Firewall Manager applications list was last updated.",Type: proto.ColumnType_TIMESTAMP,Hydrate: getFmsAppList,},{Name: "list_update_token",Description: "A unique identifier for each update to the list. When you update the list, the update token must match the token of the current version of the application list.",Type: proto.ColumnType_STRING,Hydrate: getFmsAppList,},{Name: "previous_apps_list",Description: "A map of previous version numbers to their corresponding App object arrays.",Type: proto.ColumnType_JSON,Hydrate: getFmsAppList,},{Name: "apps_list",Description: "An array of applications in the Firewall Manager applications list.",Type: proto.ColumnType_JSON,Hydrate: getFmsAppList,},
Those columns correspond to fields of the type AppsListData in the AWS SDK.
type AppsListData struct {// An array of applications in the Firewall Manager applications list.//// This member is required.AppsList []App// The name of the Firewall Manager applications list.//// This member is required.ListName *string// The time that the Firewall Manager applications list was created.CreateTime *time.Time// The time that the Firewall Manager applications list was last updated.LastUpdateTime *time.Time// The ID of the Firewall Manager applications list.ListId *string// A unique identifier for each update to the list. When you update the list, the// update token must match the token of the current version of the application// list. You can retrieve the update token by getting the list.ListUpdateToken *string// A map of previous version numbers to their corresponding App object arrays.PreviousAppsList map[string][]App}
HydrateConfig
Use HydrateConfig
in a table definition to provide granular control over the behavior of a hydrate function.
Things you can control with a HydrateConfig
:
Errors to ignore.
Errors to retry.
Max concurrent calls to allow.
Hydrate dependencies
Rate-limiter tags
For a Get
or List
, you can specify errors to ignore and/or retry using DefaultIgnoreConfig
and DefaultRetryConfig
as seen here in the Fastly plugin.
func Plugin(ctx context.Context) *plugin.Plugin {p := &plugin.Plugin{Name: "steampipe-plugin-fastly",ConnectionConfigSchema: &plugin.ConnectionConfigSchema{NewInstance: ConfigInstance,},DefaultTransform: transform.FromGo().NullIfZero(),DefaultIgnoreConfig: &plugin.IgnoreConfig{ShouldIgnoreErrorFunc: shouldIgnoreErrors([]string{"404"}),},DefaultRetryConfig: &plugin.RetryConfig{ShouldRetryErrorFunc: shouldRetryError([]string{"429"}),},TableMap: map[string]*plugin.Table{"fastly_acl": tableFastlyACL(ctx),..."fastly_token": tableFastlyToken(ctx),},}return p}
For other hydrate functions, you do this with HydrateConfig
. Here's how the oci_identity_tenancy
table configures error handling for the getRetentionPeriod
function.
HydrateConfig: []plugin.HydrateConfig{{Func: getRetentionPeriod,ShouldIgnoreError: isNotFoundError([]string{"404"}),},},
You can similarly use ShouldRetryError
along with a corresponding function that returns true if, for example, an API call its a rate limit.
func shouldRetryError(err error) bool {if cloudflareErr, ok := err.(*cloudflare.APIRequestError); ok {return cloudflareErr.ClientRateLimited()}return false}
You can likewise use MaxConcurrency
to limit the number of calls to a hydrate function.
In practice, the granular controls afforded by ShouldIgnoreError
, ShouldRetryError
, and MaxConcurrency
are not much used at the level of individual hydrate functions. Plugins are likelier to assert such control globally. But the flexibility is threre if you need it.
Two features of HydrateConfig
that are used quite a bit are Depends
and Tags
.
Use Depends
to make a function depend on one or more others. In aws_s3_bucket
, the function getBucketLocation returns the client region that's needed by all the other functions, so they all depend on it.
HydrateConfig: []plugin.HydrateConfig{{Func: getBucketLocation,Tags: map[string]string{"service": "s3", "action": "GetBucketLocation"},},{Func: getBucketIsPublic,Depends: []plugin.HydrateFunc{getBucketLocation},Tags: map[string]string{"service": "s3", "action": "GetBucketPolicyStatus"},},{Func: getBucketVersioning,Depends: []plugin.HydrateFunc{getBucketLocation},Tags: map[string]string{"service": "s3", "action": "GetBucketVersioning"},},
Use Tags
to expose a hydrate function to control by a limiter. In AWS plugin's, aws_config_rule
table, the HydrateConfig
specifies additional hydrate functions that fetch tags and compliance details for each config rule.
HydrateConfig:plugin.HydrateConfig{{Func: getConfigRuleTags,Tags: map[string]string{"service": "config", "action": "ListTagsForResource"},},{Func: getComplianceByConfigRules,Tags: map[string]string{"service": "config", "action": "DescribeComplianceByConfigRule"},},},
In this example the Func
property names getConfigRuleTags
and getComplianceByConfigRules
as additional hydrate functions that fetch tags and compliance details for each config rule, respectively. The Tags
property enables a rate limiter to target these functions. (See also function-tags below.)
Memoize: Caching hydrate results
The Memoize function can be used to cache the results of a HydrateFunc
.
In the multi_region.go file of steampipe-plugin-aws
repository, the listRegionsForServiceCacheKey
function is used to create a custom cache key for the listRegionsForService
function. This cache key includes the service ID, which is unique for each AWS service.
Here's a simplified version of the code:
func listRegionsForServiceCacheKey(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error) {serviceID := h.Item.(string)key := fmt.Sprintf("listRegionsForService-%s", serviceID)return key, nil}var listRegionsForService = plugin.HydrateFunc(listRegionsForServiceUncached).Memoize(memoize.WithCacheKeyFunction(listRegionsForServiceCacheKey))
In this example, Memoize
caches the results of listRegionsForServiceUncached
. The WithCacheKeyFunction
option specifies a custom function (listRegionsForServiceCacheKey
) to generate the cache key. This function takes the service ID from the hydrate data and includes it in the cache key, ensuring a unique cache key for each AWS service.
This is a common pattern when using Memoize
: you define a HydrateFunc
and then wrap it with Memoize
to enable caching. You can also use the WithCacheKeyFunction
option to specify a custom function that generates the cache key, which is especially useful when you need to include additional context in the cache key.
Transform Functions
Transform functions are used to extract and/or reformat data returned by a hydrate function into the desired type/format for a column. You can call your own transform function with From
, but you probably don't need to write one -- The SDK provides many that cover the most common cases. You can chain transforms together, but the transform chain must be started with a From
function:
Name | Description |
---|---|
FromConstant | Return a constant value (specified by 'param'). |
FromField | Generate a value by retrieving a field from the source item. |
FromValue | Generate a value by returning the raw hydrate item. |
FromCamel | Generate a value by converting the given field name to camel case and retrieving from the source item. |
FromGo | Generate a value by converting the given field name to camel case and retrieving from the source item. |
From | Generate a value by calling a 'transformFunc'. |
FromJSONTag | Generate a value by finding a struct property with the json tag matching the column name. |
FromTag | Generate a value by finding a struct property with the tag 'tagName' matching the column name. |
FromP | Generate a value by calling 'transformFunc' passing param. |
Additional functions can be chained after a From
function to transform the data:
Name | Description |
---|---|
Transform | Apply an arbitrary transform to the data (specified by 'transformFunc'). |
TransformP | Apply an arbitrary transform to the data, passing a parameter. |
NullIfEqual | If the input value equals the transform param, return nil. |
NullIfZero | If the input value equals the zero value of its type, return nil. |
Translating SQL Operators to API Calls
When you write SQL that resolves to API calls, you want a SQL operator like >
to influence an API call in the expected way.
Consider this query:
SELECT * FROM github_issue WHERE updated_at > '2022-01-01'
You would like the underlying API call to filter accordingly.
In order to intercept the SQL operator, and implement it in your table code, you declare it in the KeyColumns
property of the table.
KeyColumns: []*plugin.KeyColumn{{Name: "repository_full_name",Require: plugin.Required,},{Name: "author_login",Require: plugin.Optional,},{Name: "state",Require: plugin.Optional,},{Name: "updated_at",Require: plugin.Optional,Operators: []string{">", ">="}, // declare operators your get/list/hydrate function handles},
Then, in your table code, you write a handler for the column. The handler configures the API to filter on one or more operators.
if d.Quals["updated_at"] != nil {for _, q := range d.Quals["updated_at"].Quals {givenTime := q.Value.GetTimestampValue().AsTime() // timestamp from the SQL queryafterTime := givenTime.Add(time.Second * 1) // one second after the given timeswitch q.Operator {case ">":filters.Since = githubv4.NewDateTime(githubv4.DateTime{Time: afterTime}) // handle WHERE updated_at > '2022-01-01'case ">=":filters.Since = githubv4.NewDateTime(githubv4.DateTime{Time: givenTime}) // handle WHERE updated_at >= '2022-01-01'}}}
Example: Table Definition File
package zendeskimport ("context""github.com/nukosuke/go-zendesk/zendesk""github.com/turbot/steampipe-plugin-sdk/grpc/proto""github.com/turbot/steampipe-plugin-sdk/plugin")func tableZendeskUser() *plugin.Table {return &plugin.Table{Name: "zendesk_user",Description: "Zendesk Support has three types of users: end users (your customers), agents, and administrators.",List: &plugin.ListConfig{Hydrate: listUser,},Get: &plugin.GetConfig{KeyColumns: plugin.SingleColumn("id"),Hydrate: getUser,},Columns: []*plugin.Column{{Name: "active", Type: proto.ColumnType_BOOL, Description: "False if the user has been deleted"},{Name: "alias", Type: proto.ColumnType_STRING, Description: "An alias displayed to end users"},{Name: "chat_only", Type: proto.ColumnType_BOOL, Description: "Whether or not the user is a chat-only agent"},{Name: "created_at", Type: proto.ColumnType_TIMESTAMP, Description: "The time the user was created"},{Name: "custom_role_id", Type: proto.ColumnType_INT, Description: "A custom role if the user is an agent on the Enterprise plan"},{Name: "default_group_id", Type: proto.ColumnType_INT, Description: "The id of the user's default group"},{Name: "details", Type: proto.ColumnType_STRING, Description: "Any details you want to store about the user, such as an address"},{Name: "email", Type: proto.ColumnType_STRING, Description: "The user's primary email address. *Writeable on create only. On update, a secondary email is added."},{Name: "external_id", Type: proto.ColumnType_STRING, Description: "A unique identifier from another system. The API treats the id as case insensitive. Example: \"ian1\" and \"Ian1\" are the same user"},{Name: "id", Type: proto.ColumnType_INT, Description: "Automatically assigned when the user is created"},{Name: "last_login_at", Type: proto.ColumnType_TIMESTAMP, Description: "The last time the user signed in to Zendesk Support"},{Name: "locale", Type: proto.ColumnType_STRING, Description: "The user's locale. A BCP-47 compliant tag for the locale. If both \"locale\" and \"locale_id\" are present on create or update, \"locale_id\" is ignored and only \"locale\" is used."},{Name: "locale_id", Type: proto.ColumnType_INT, Description: "The user's language identifier"},{Name: "moderator", Type: proto.ColumnType_BOOL, Description: "Designates whether the user has forum moderation capabilities"},{Name: "name", Type: proto.ColumnType_STRING, Description: "The user's name"},{Name: "notes", Type: proto.ColumnType_STRING, Description: "Any notes you want to store about the user"},{Name: "only_private_comments", Type: proto.ColumnType_BOOL, Description: "true if the user can only create private comments"},{Name: "organization_id", Type: proto.ColumnType_INT, Description: "The id of the user's organization. If the user has more than one organization memberships, the id of the user's default organization"},{Name: "phone", Type: proto.ColumnType_STRING, Description: "The user's primary phone number."},{Name: "photo_content_type", Type: proto.ColumnType_STRING, Description: "The content type of the image. Example value: \"image/png\""},{Name: "photo_content_url", Type: proto.ColumnType_STRING, Description: "A full URL where the attachment image file can be downloaded"},{Name: "photo_deleted", Type: proto.ColumnType_STRING, Description: "If true, the attachment has been deleted"},{Name: "photo_file_name", Type: proto.ColumnType_STRING, Description: "The name of the image file"},{Name: "photo_id", Type: proto.ColumnType_INT, Description: "Automatically assigned when created"},{Name: "photo_inline", Type: proto.ColumnType_BOOL, Description: "If true, the attachment is excluded from the attachment list and the attachment's URL can be referenced within the comment of a ticket. Default is false"},{Name: "photo_size", Type: proto.ColumnType_INT, Description: "The size of the image file in bytes"},{Name: "photo_thumbnails", Type: proto.ColumnType_JSON, Description: "An array of attachment objects. Note that photo thumbnails do not have thumbnails"},{Name: "report_csv", Type: proto.ColumnType_BOOL, Description: "Whether or not the user can access the CSV report on the Search tab of the Reporting page in the Support admin interface."},{Name: "restricted_agent", Type: proto.ColumnType_BOOL, Description: "If the agent has any restrictions; false for admins and unrestricted agents, true for other agents"},{Name: "role", Type: proto.ColumnType_STRING, Description: "The user's role. Possible values are \"end-user\", \"agent\", or \"admin\""},{Name: "role_type", Type: proto.ColumnType_INT, Description: "The user's role id. 0 for custom agents, 1 for light agent, 2 for chat agent, and 3 for chat agent added to the Support account as a contributor (Chat Phase 4)"},{Name: "shared", Type: proto.ColumnType_BOOL, Description: "If the user is shared from a different Zendesk Support instance. Ticket sharing accounts only"},{Name: "shared_agent", Type: proto.ColumnType_BOOL, Description: "If the user is a shared agent from a different Zendesk Support instance. Ticket sharing accounts only"},{Name: "shared_phone_number", Type: proto.ColumnType_BOOL, Description: "Whether the phone number is shared or not."},{Name: "signature", Type: proto.ColumnType_STRING, Description: "The user's signature. Only agents and admins can have signatures"},{Name: "suspended", Type: proto.ColumnType_BOOL, Description: "If the agent is suspended. Tickets from suspended users are also suspended, and these users cannot sign in to the end user portal"},{Name: "tags", Type: proto.ColumnType_JSON, Description: "The user's tags. Only present if your account has user tagging enabled"},{Name: "ticket_restriction", Type: proto.ColumnType_STRING, Description: "Specifies which tickets the user has access to. Possible values are: \"organization\", \"groups\", \"assigned\", \"requested\", null"},{Name: "timezone", Type: proto.ColumnType_STRING, Description: "The user's time zone."},{Name: "two_factor_auth_enabled", Type: proto.ColumnType_BOOL, Description: "If two factor authentication is enabled"},{Name: "updated_at", Type: proto.ColumnType_TIMESTAMP, Description: "The time the user was last updated"},{Name: "url", Type: proto.ColumnType_STRING, Description: "The user's API url"},{Name: "user_fields", Type: proto.ColumnType_JSON, Description: "Values of custom fields in the user's profile."},{Name: "verified", Type: proto.ColumnType_BOOL, Description: "Any of the user's identities is verified."},},}}func listUser(ctx context.Context, d *plugin.QueryData, _ *plugin.HydrateData) (interface{}, error) {conn, err := connect(ctx)if err != nil {return nil, err}opts := &zendesk.UserListOptions{PageOptions: zendesk.PageOptions{Page: 1,PerPage: 100,},}for true {users, page, err := conn.GetUsers(ctx, opts)if err != nil {return nil, err}for _, t := range users {d.StreamListItem(ctx, t)}if !page.HasNext() {break}opts.Page++}return nil, nil}func getUser(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error) {conn, err := connect(ctx)if err != nil {return nil, err}quals := d.KeyColumnQualsplugin.Logger(ctx).Warn("getUser", "quals", quals)id := quals["id"].GetInt64Value()plugin.Logger(ctx).Warn("getUser", "id", id)result, err := conn.GetUser(ctx, id)if err != nil {return nil, err}return result, nil}
Dynamic Tables
In the plugin definition, if SchemaMode
is set to dynamic
, every time
Steampipe starts, the plugin's schema will be checked for any changes since the
last time it loaded, and re-import the schema if it detects any.
Dynamic tables are useful when you are building a plugin whose schema is not known at compile time; instead, its schema will be generated at runtime. For instance, a plugin with dynamic tables is useful if you want to load CSV files as tables from one or more directories. Each of these CSV files may have different column structures, resulting in a different structure for each table.
In order to create a dynamic table, in the plugin definition, TableMapFunc
should call a function that returns map[string]*plugin.Table
.
For instance, in the CSV plugin:
func Plugin(ctx context.Context) *plugin.Plugin {p := &plugin.Plugin{Name: "steampipe-plugin-csv",ConnectionConfigSchema: &plugin.ConnectionConfigSchema{NewInstance: ConfigInstance,Schema: ConfigSchema,},DefaultTransform: transform.FromGo().NullIfZero(),SchemaMode: plugin.SchemaModeDynamic,TableMapFunc: PluginTables,}return p}func PluginTables(ctx context.Context, p *plugin.Plugin) (map[string]*plugin.Table, error) {// Initialize tablestables := map[string]*plugin.Table{}// Search for CSV files to create as tablespaths, err := csvList(ctx, p)if err != nil {return nil, err}for _, i := range paths {tableCtx := context.WithValue(ctx, "path", i)base := filepath.Base(i)// tableCSV returns a *plugin.Table typetables[base[0:len(base)-len(filepath.Ext(base))]] = tableCSV(tableCtx, p)}return tables, nil}
The tableCSV
function mentioned in the example above looks for all CSV files in the configured paths, and for each one, builds a *plugin.Table
type:
func tableCSV(ctx context.Context, p *plugin.Plugin) *plugin.Table {path := ctx.Value("path").(string)csvFile, err := os.Open(path)if err != nil {plugin.Logger(ctx).Error("Could not open CSV file", "path", path)panic(err)}r := csv.NewReader(csvFile)csvConfig := GetConfig(p.Connection)if csvConfig.Separator != nil && *csvConfig.Separator != "" {r.Comma = rune((*csvConfig.Separator)[0])}if csvConfig.Comment != nil {if *csvConfig.Comment == "" {// Disable commentsr.Comment = 0} else {// Set the comment characterr.Comment = rune((*csvConfig.Comment)[0])}}// Read the header to peak at the column namesheader, err := r.Read()if err != nil {plugin.Logger(ctx).Error("Error parsing CSV header:", "path", path, "header", header, "err", err)panic(err)}cols := []*plugin.Column{}for idx, i := range header {cols = append(cols, &plugin.Column{Name: i, Type: proto.ColumnType_STRING, Transform: transform.FromField(i), Description: fmt.Sprintf("Field %d.", idx)})}return &plugin.Table{Name: path,Description: fmt.Sprintf("CSV file at %s", path),List: &plugin.ListConfig{Hydrate: listCSVWithPath(path),},Columns: cols,}}
The end result is when using the CSV plugin, whenever Steampipe starts, it will
check for any new, deleted, and modified CSV files in the configured paths
and create any discovered CSVs as tables. The CSV filenames are turned directly
into table names.
For more information on how the CSV plugin can be queried as a result of being a dynamic table, please see the {csv_filename} table document.
Client-Side Rate Limiting
The Steampipe Plugin SDK supports a client-side rate limiting implementation to allow users to define plugin limiter
blocks to control concurrency and rate limiting. Support for limiters is built in to the SDK and basic functionality requires no changes to the plugin code; Just including the SDK will enable users to create limiters for your plugin using the built in connection
, table
, and function_name
scopes. You can add additional flexibility by adding function tags and by accounting for paging in List calls.
Function Tags
Hydrate function tags provide useful diagnostic metadata, and they can also be used as scopes in rate limiters. Rate limiting requirements vary by plugin because the underlying APIs that they access implement rate limiting differently. Tags provide a way for a plugin author to scope rate limiters in a way that aligns with the API implementation.
Tags can be added to a ListConfig, GetConfig, or HydrateConfig.
//// TABLE DEFINITIONfunc tableAwsSnsTopic(_ context.Context) *plugin.Table {return &plugin.Table{Name: "aws_sns_topic",Description: "AWS SNS Topic",Get: &plugin.GetConfig{KeyColumns: plugin.SingleColumn("topic_arn"),IgnoreConfig: &plugin.IgnoreConfig{ShouldIgnoreErrorFunc: shouldIgnoreErrors([]string{"NotFound", "InvalidParameter"}),},Hydrate: getTopicAttributes,Tags: map[string]string{"service": "sns", "action": "GetTopicAttributes"},},List: &plugin.ListConfig{Hydrate: listAwsSnsTopics,Tags: map[string]string{"service": "sns", "action": "ListTopics"},},HydrateConfig: []plugin.HydrateConfig{{Func: listTagsForSnsTopic,Tags: map[string]string{"service": "sns", "action": "ListTagsForResource"},},{Func: getTopicAttributes,Tags: map[string]string{"service": "sns", "action": "GetTopicAttributes"},},},...
Once the tags are added to the plugin, you can use them in the scope
and where
arguments for your rate limiter.
plugin "aws" {limiter "sns_get_topic_attributes_us_east_1" {bucket_size = 3000fill_rate = 3000scope = ["connection", "region", "service", "action"]where = "action = 'GetTopicAttributes' and service = 'sns' and region = 'us-east-1' "}}
Accounting for Paged List Calls
The Steampipe plugin SDK transparently handles most of the details around waiting for limiters. List calls, however, usually iterate through pages of results, and each call to fetch a page must wait for any limiters that are defined. The SDK provides a hook, WaitForListRateLimit
, which should be called before paging to apply rate limiting to the list call:
// List callfor paginator.HasMorePages() {// apply rate limitingd.WaitForListRateLimit(ctx)output, err := paginator.NextPage(ctx)if err != nil {plugin.Logger(ctx).Error("List error", "api_error", err)return nil, err}for _, items := range output.Items {d.StreamListItem(ctx, items)// Context can be cancelled due to manual cancellation or the limit has been hitif d.RowsRemaining(ctx) == 0 {return nil, nil}}}
Logging
A logger is passed to the plugin via the context. You can use the logger to write messages to the log at standard log levels:
logger := plugin.Logger(ctx)logger.Info("Log message and a variable", myVariable)
The plugin logs do not currently get written to the console, but are written to the plugin logs at ~/.steampipe/logs/plugin-YYYY-MM-DD.log
, e.g., ~/.steampipe/logs/plugin-2022-01-01.log
.
Steampipe uses the hclog package, which uses standard log levels (TRACE
, DEBUG
, INFO
, WARN
, ERROR
). By default, the log level is WARN
. You set it using the STEAMPIPE_LOG_LEVEL
environment variable:
export STEAMPIPE_LOG_LEVEL=TRACE
Installing and Testing Your Plugin
A plugin binary can be installed manually, and this is often convenient when developing the plugin. Steampipe will attempt to load any plugin that is referred to in a connection
configuration:
- The plugin binary file must have a
.plugin
extension - The plugin binary must reside in a subdirectory of the
~/.steampipe/plugins/local/
directory and must be the ONLY.plugin
file in that subdirectory - The
connection
must specify the path (relative to~/.steampipe/plugins/
) to the plugin in theplugin
argument
For example, consider a myplugin
plugin that you have developed. To install it:
- Create a subdirectory
.steampipe/plugins/local/myplugin
- Name your plugin binary
myplugin.plugin
, and copy it to.steampipe/plugins/local/myplugin/myplugin.plugin
- Create a
~/.steampipe/config/myplugin.spc
config file containing a connection definition that points to your plugin:connection "myplugin" {plugin = "local/myplugin"} - Your connection will be loaded the next time Steampipe runs. If Steampipe is running service mode, you must restart it to load the connection.