Skip to content

Building Integration Connectors

The integration connectors support the exchange of metadata with third party technologies. This exchange may be inbound and/or outbound; synchronous, polling or event-driven.

An integration connector runs in an Open Metadata Integration Service (OMIS) which is in turn hosted in an Integration Daemon server. Each integration service provides a specialist interface designed to aid the integration with a specific type of technology. The integration connector implementation is therefore dependent on a specific OMIS.

Deployed Integration Connector

An integration connector is shown deployed in an integration service running in an integration daemon. The connector is linking to a third party technology and also calling the open metadata APIs of Egeria to manage the exchange of metadata.

The purpose of the integration daemon and its integration services is to minimise the effort required to integrate a third party technology into the open metadata ecosystem. They handle:

  • Management of configuration - including user security information.
  • Starting and stopping of your integration logic.
  • Thread management and polling.
  • Access to the open metadata repositories for query and maintenance of open metadata.
  • Ability to write to audit log and maintain measurements for performance metrics.
  • Metadata provenance.

This means you can focus on interacting with the third party technology and mapping its metadata to open metadata in your integration connector.

Integration connector interface

An integration connector can:

  • Listen on a blocking call, waiting for the third party technology to send a notification.
  • Register with an external notification service that sends notifications on its own thread.
  • Register a listener with its context to act on notifications from the partner OMAS's Out Topic.
  • Poll the third party technology each time that the integration daemon calls your integration connector's refresh() method.
  • Issue queries and maintenance (create, update, delete) requests to the open metadata repositories.

Access to open metadata is provided via a context object. The Open Metadata Integration Services (OMISs) each provide a context object that is specialized for a particular category of third party technology in order to provide the most optimal interface to open metadata for your integration connector. This typically includes:

  • The ability to register a listener to receive events from the OMAS's Out Topic, or send events to the OMAS's In Topic.
  • The ability to create and update metadata instances.
  • For assets, the ability to change an asset's visibility by changing its zone membership using the publish and withdraw methods.
  • The ability to delete metadata.
  • Various retrieval methods to help when comparing the metadata in the open metadata repositories with the metadata in the third party technology.

Each integration service defines the base class that an integration connector must implement if they are to run under that service. The base classes differ only in the type of context object that they support. Select the integration service, and hence the base class, to use for your integration connector from the table below.

Integration Service Type of technology supported Link to integration connector base class
API Integrator OMIS API Schemas APIIntegratorConnector class.
Catalog Integrator OMIS Assets and related metadata found in an Asset Catalog CatalogIntegratorConnector class.
Database Integrator OMIS Databases and their schema DatabaseIntegratorConnector class.
Display Integrator OMIS Forms, reports and the queries they depend on DisplayIntegratorConnector class.
Files Integrator OMIS Files and their internal structure FilesIntegratorConnector class.
Infrastructure Integrator OMIS IT infrastructure landscape such as hosts, platforms and servers InfrastructureIntegratorConnector class.
Lineage Integrator OMIS Processes and their execution flow LineageIntegratorConnector class.
Organization Integrator OMIS Onboard teams, profiles and users OrganizationIntegratorConnector
Security Integrator OMIS Publishing information about users and resources. SecurityIntegratorConnector class.
Topic Integrator OMIS Event topics and the structure of the events they share. TopicIntegratorConnector class.

The context object is a wrapper around the client of an Open Metadata Access Service (OMAS). It is accessed by the integration connector using the getContext() method. The OMAS supplies the properties, open metadata listener interface and event structures for the API.

OMIS OMAS Pair

Therefore, you need to add dependencies for your selected OMIS's API module and the API module of is partner OMAS. This is shown in the table below:

Integration Service Partner OMAS Dependencies
API Integrator OMIS Data Manager OMAS api-integrator-api, data-manager-api
Catalog Integrator OMIS Asset Manager OMAS catalog-integrator-api, asset-manager-api
Database Integrator OMIS Data Manager OMAS database-integrator-api, data-manager-api
Display Integrator OMIS Data Manager OMAS display-integrator-api, data-manager-api
Files Integrator OMIS Data Manager OMAS files-integrator-api, data-manager-api
Infrastructure Integrator OMIS IT infrastructure OMAS infrastructure-integrator-api, it-infrastructure-api
Lineage Integrator OMIS Asset Manager OMAS lineage-integrator-api, asset-manager-api
Organization Integrator OMIS Community Profile OMAS organization-integrator-api, community-profile-api
Security Integrator OMIS Security Manager OMAS security-integrator-api, security-manager-api
Topic Integrator OMIS Data Manager OMAS topic-integrator-api, data-manager-api

These dependencies are in addition to the standard dependencies for an integration connector:

Example of the Maven dependencies for an integration connector ...
        <dependency>
            <groupId>org.odpi.egeria</groupId>
            <artifactId>topic-integrator-api</artifactId>
            <scope>provided</scope>
            <version>${open-metadata.version}</version>
        </dependency>

        <dependency>
            <groupId>org.odpi.egeria</groupId>
            <artifactId>data-manager-api</artifactId>
            <scope>provided</scope>
            <version>${open-metadata.version}</version>
        </dependency>

        <dependency>
            <groupId>org.odpi.egeria</groupId>
            <artifactId>audit-log-framework</artifactId>
            <scope>provided</scope>
            <version>${open-metadata.version}</version>
        </dependency>

        <dependency>
            <groupId>org.odpi.egeria</groupId>
            <artifactId>open-connector-framework</artifactId>
            <scope>provided</scope>
            <version>${open-metadata.version}</version>
        </dependency>

        <dependency>
            <groupId>org.odpi.egeria</groupId>
            <artifactId>repository-services-apis</artifactId>
            <scope>provided</scope>
            <version>${open-metadata.version}</version>
        </dependency>

        <dependency>
            <groupId>org.odpi.egeria</groupId>
            <artifactId>integration-daemon-services-api</artifactId>
            <scope>provided</scope>
            <version>${open-metadata.version}</version>
        </dependency>

Use provided scope ...

Notice the <scope>provided</scope> setting for the Egeria libraries. This prevents the Egeria libraries from being included in your connector jar file. By using the provided scope, your connector can run with any level of Egeria that supports this type of connector. Without it, duplicate Egeria classes would be loaded into your OMAG Server Platform and if the platform was running at a different level it is not certain which version of the classes would run. (It "may" be ok but experience, as we know, teaches us that "if it can go wrong it will go wrong" so avoiding problems is always preferable :).

You will also need to add the dependencies for the third party technology that your connector is calling.

All the integration connector base classes inherit from (extend) the IntegrationConnectorBase . This class defines the lifecycle methods of the integration connector.

Methods implemented by an integration connector

Methods implemented by an integration connector. The base class implements the initialize(), setAuditLog(), setConnectorName(), and setContext() methods. Your integration connector only needs to supply the start, refresh and disconnect method. It implements the engage method only if it needs to issue a blocking call.

  • initialize is a standard method for all connectors that is called by the connector broker when a request is made to create an instance of the connector. The connector broker uses the initialize method to pass the connection object used to create the connector instance and a unique identifier for this instance of the connector. This method is provided by the integration connector's base class. Your code can access the connection properties via the connectionProperties variable and the connector's unique identifier via the connectorInstanceId variable.

  • setAuditLog provides an Audit Log Framework (ALF) compatible logging destination. This method is provided by the integration connector's base class. Your code can access the audit log via the auditLog variable.

  • setConnectorName provides the name of the connector from the configuration, so it can be used for logging. This method is provided by the integration connector's base class. Your code can access your integration connector's name via the connectorName variable.

  • initializeEmbeddedConnectors saves the optional list of embedded connectors that were defined in the connection object for your integration connector when it was configured. These connectors are digital resource connectors for use by your integration connector to call the third party technology. This method is provided by the integration connector's base class. Your code can access the embedded connector's via the embeddedConnectors variable.

  • setContext sets up the integration service specific context object. This method is also provided by the integration connector's base class. Your code can access the connector's name via the context variable. However, it is recommended that because it is set to null after the disconnect method (described below), your connector should use the super.getContext() method to access the context, particularly if your connector operates in multiple threads, which occurs when the connector is using listeners.

  • start indicates that the connector is completely configured (that is all the methods listed above have been called) and it can begin processing. This call is where the configuration properties are extracted from the connection object. It can also be used to register with non-blocking services. For example, it can register a listener for events from the OMAS Out Topic through the context.

  • engage is used as an alternative to refresh when the connector is configured to need to issue blocking calls to wait for new metadata. It is called from its own thread. It is recommended that the engage() method returns when each blocking call completes. The integration daemon will pause a second and then call engage() again. This pattern enables the calling thread to detect the shutdown of its hosting integration daemon server. This method is implemented by the integration connector's base class to do nothing. You only need to override it if your integration connector is issuing blocking calls.

  • refresh requests that the connector does a comparison of the metadata in the third party technology and open metadata repositories. Refresh is called from the connector's own thread under the following conditions:

    1. when the integration connector first starts and then
    2. at intervals defined in the connector's configuration as well as
    3. any external REST API calls to explicitly refresh the connector.
  • disconnect is called when the server is shutting down. The connector should free up any resources that it holds since it is not needed any more. Once disconnect has been called the context is no longer valid.

Therefore, you are typically looking to implement the start, refresh and disconnect methods in your integration connector, and optionally overriding the engage method if your connector issues blocking calls.

Designing your integration connector

There are five main design decisions to make before you start coding:

  • How is the work of the connector triggered - explicitly through the connection object contents or by listening for events from either the third party technology or open metadata?
  • Which direction the metadata synchronization is going. Is the third party technology the source of metadata or is metadata the open metadata ecosystem being pushed to the third party technology?
  • How are elements from the third party technology mapped to and correlated with the elements in open metadata.
  • If the third party technology is the source, should the metadata created in the open metadata ecosystem be read-only so that it can not be changed by other tools. This is achieved using External source metadata provenance.
  • How is the third party technology to be called? Ideally, your

Three patterns for connections

Your integration connector is created and initialized with a connection object. This connection object should contain all the configuration needed by your integration connector. For example, it may contain configuration properties that can control the behavior of your connector. When connecting to the third party technology, optional userId and password for the third party technology may be stored in the connection along with endpoint information that defines the network address of its deployment.

Connection object with an explicit endpoint

An explicit endpoint is added to the integration connector's connection in its configuration to provide information on the network location of the third party technology. This is used to initialize the client libraries needed to call the third party technology.

A connection with no endpoint

If no endpoint is configured in the integration connector's connection, the endpoint information can be retrieved from open metadata by calling the context object and/or listening for notifications from the partner OMAS.

An alternative approach to calling the third party technology directly in your integration connector is to use one or more appropriate digital resource connectors to call the third party technology. The connection objects for these digital resource connectors are embedded in the connection object for the integration connector.

A virtual connection include embedded connection

A Virtual Connection is a special type of connection that allows connections for different connectors to be embedded. This style of connection can be used by an integration connector that is making use of digital resource connectors to call its third party technology. Typically, there is only one embedded connection, but multiple embedded connections can be used. Also, the embedded connections themselves may be virtual connections.

Embedding a digital resource connector in an integration connector is an ideal approach for working with any third party technology that is being catalogued as assets:

  • Consumers of the digital resources in the third party technology need a digital resource connector to access the content of the digital resource. It may be possible to use the same digital resource connector in the integration connector.
  • Often, the integration connector is not the only connector that is accessing a particular third party technology. There may be open discovery services and governance action services that also need to access the third party technology once the integration connector has run to create the basic technical metadata.

For example, Egeria has a JDBC digital resource connector for accessing databases. It can be used by consumers of databases as well as various governance connectors that are cataloguing and managing databases.

Multiple uses of the JDBC digital resource connector

When the digital resource connectors are defined in a virtual connection (rather than being initialized in the integration connector logic), the integration daemon can manage the lifecycle of the embedded connectors with the lifecycle of the integration connectors, reducing the chances of memory leaks and held resources as the connectors/integration services/integration daemon are restarted over the lifetime of their hosting OMAG Server Platform.

This pattern is not always possible if the integration connector needs to use a different interface to access the third party technology's metadata from its resources. For example, the Kafka Monitor Integration Connector, which detects the creation of new Kafka Topics and catalogues them in open metadata, does not use the Kafka Open Metadata Topic Connector because it uses a different Apache Kafka interface to do its work.

Metadata flow for your connector

The refresh method of your connector is called periodically to ensure the metadata in the third party technology is consistent with the metadata in the open metadata ecosystem. It operates in two phases:

  1. Retrieving metadata from the source and ensuring the equivalent metadata is present in the metadata destination.

  2. Retrieving metadata from the destination and deleting any elements that are not present in the source.

Third party technology is the metadata source

When the third party technology is the metadata source (for example, it is a relational database or a file system) the refresh method ensures that the open metadata in Egeria is exactly the same as the metadata in the third party technology.

Third party technology is the metadata destination

When the open metadata ecosystem is the metadata source and the integration connector is responsible for distributing a subset of the open metadata to the third party technology, the refresh method ensures this subset (and no more) is present in the third party technology.

Mapping the third party technology to open metadata

Your integration connector needs to be able to map between the elements in the third party technology and in the open metadata ecosystem. Each will use different unique identifiers that it is unlikely that you can control. Design the qualifiedName of the open metadata elements to be constructable from the identifier of the equivalent metadata element in the third party technology.

What if there is not a one-to-one correspondence between elements

The Catalog Integrator OMIS supports external identifiers which can help to correlate complex relationships between the third party technology and open metadata.

Controlling external source metadata provenance

The configuration for an integration connector in the Integration Daemon includes a metadataSourceQualifiedName. The default value is null which means store the metadata in any metadata collection that is owned by the locally connected cohorts. Alternatively, it specifies the qualifiedName of a software capability entity that represents the third party technology. This is automatically catalogued by the integration daemon if it is not found in the open metadata ecosystem. The guid and qualifiedName of this entity is used to identify the external metadata collection that any open metadata elements created by the integration connector will be stored in. This prevents processes other than the integration connector from modifying the metadata elements.

Some integration services allow the integration connector code to control which metadata collection to use if the metadataSourceQualifiedName is configured. If it is set to true, the external metadata collection is used, otherwise it is one of the local cohort's collection.

Integration Service Method to control external source metadata provenance
API Integrator OMIS Call setAPIManagerIsHome() method to set toggle. Default is true, which means external provenance is used if it is configured.
Catalog Integrator OMIS Use assetManagerIsHome property on method calls.
Database Integrator OMIS External source metadata provenance is controlled by the configuration only.
Display Integrator OMIS Call setApplicationIsHome() to set toggle. Default is true, which means external provenance is used if it is configured.
Files Integrator OMIS External source metadata provenance is controlled by the configuration.
Infrastructure Integrator OMIS Call setInfrastructureManagerIsHome() method to set toggle. Default is true, which means external provenance is used if it is configured.
Lineage Integrator OMIS Use assetManagerIsHome property on method calls.
Organization Integrator OMIS External source metadata provenance is controlled by the configuration only.
Security Integrator OMIS External source metadata provenance is controlled by the configuration only.
Topic Integrator OMIS Call setEventBrokerIsHome() method to set toggle. Default is true, which means external provenance is used if it is configured.

Writing the connector provider

The purpose of the connector provider is to provide information on how to configure, and initialize a particular connector implementation. It is both an information component describing the properties of the connector during set up, and also the factory class used to create an instance of the connector at runtime.

The ConnectorProvider interface

All connector providers implement the ConnectorProvider interface. This interface includes the getConnectorType() method that returns the ConnectorType that is added to a Connection object used to create an instance of the connector.

Connection object structure

The connection object contains properties needed by the connection object to operate. It includes a connector type object that is used when constructing the connector and an endpoint object that defines where the corresponding digital resource is located.

The connector type describes the capabilities of the connector such as:

  • the java class of this connector provider. A connector provider is the factory for its Connector. It is typically called from the Connector Broker. The connector broker uses the connectorProviderClassName in the connector type to create an instance of the connector provider.

  • the configurationProperties that can be added to the connector's connection object to adapt its behavior. The administrator who is configuring the connector used the recognizedConfigurationProperties from the connector type to determine the properties

The ConnectorProvider interface also defines the getConnector() method called to construct an instance of the connector at runtime using the connection object.

Audit Logging

If the connector provider implements the AuditLoggingComponent interface, it is passed an audit log object used to create child audit log objects that are passed to each connector instance when it is created.

The connector provider is also able to return the ComponentDescription object used in each child audit log.

ConnectorProviderBase class

Egeria provides a base class for the connector provider called ConnectorProviderBase that implements the factory classes for a connector. It also stores all the properties needed by the information methods such as getConnectorType(). Most connector provider implementations use this base class and only need to pass appropriate values to initialize the properties for the information methods.

Each connector provider for an integration connector extends the IntegrationConnectorProvider base class, which in turn extends the standard ConnectorProviderBase:

This assumes the integration connector's implementation class is instantiated via the default constructor and all of its configuration information is contained in the Connection object supplied on the initialize() method.

If your connector implementation matches these requirements, its connector provider implementation need only implement a constructor to configure the base class's function with details of itself and the Java class of the connector it needs using:

  • a GUID for the connector type
  • a name for the connector type.
  • a description of what the connector is for and how to configure it.
  • the connector class it instantiates.
  • a list of the additional properties, configuration properties and secured properties needed to configure instances of the connector.
  • a description of the connector for its audit log (if the connector implements AuditLoggingComponent).
/**
 * XXXStoreProvider is the OCF connector provider for the XXX integration connector.
 */
public class XXXStoreProvider extends IntegrationConnectorProviderBase
{
    /*
     * Unique identifier of the connector for the audit log.
     */
    private static final int    connectorComponentId   = 10001; /* Add unique number here - Egeria uses numbers under 1000 */

    /*
     * Unique identifier for the connector type.
     */
    private static final String connectorTypeGUID      = "Add unique GUID here";

    /*
     * Descriptive information about the connector for the connector type and audit log.
     */
    private static final String connectorQualifiedName = "MyOrg:Integration:XXXStoreConnector";
    private static final String connectorDisplayName   = "XXX Store Connector";
    private static final String connectorDescription   = "Connector supports ... add details here.";
    private static final String connectorWikiPage      = "Add url to documentation here";


    /*
     * Define the name of the connector implementation.
     */
    private static final String connectorClassName = "packagename.XXXStoreConnector";

    /*
     * Define the name of configuration properties (optional).
     */
    public static final String TEMPLATE_QUALIFIED_NAME_CONFIGURATION_PROPERTY = "templateQualifiedName";

    /**
     * Constructor used to initialize the ConnectorProviderBase class.
     */
    public XXXStoreProvider()
    {
        super();

        /*
         * Set up the class name of the connector that this provider creates.
         */
        super.setConnectorClassName(connectorClassName);

        /*
         * Set up the connector type that should be included in a connection used to configure this connector.
         */
        ConnectorType connectorType = new ConnectorType();
        connectorType.setType(ConnectorType.getConnectorTypeType());
        connectorType.setGUID(connectorTypeGUID);
        connectorType.setQualifiedName(connectorQualifiedName);
        connectorType.setDisplayName(connectorDisplayName);
        connectorType.setDescription(connectorDescription);
        connectorType.setConnectorProviderClassName(this.getClass().getName());

        List<String> recognizedConfigurationProperties = new ArrayList<>();
        recognizedConfigurationProperties.add(TEMPLATE_QUALIFIED_NAME_CONFIGURATION_PROPERTY);
        connectorType.setRecognizedConfigurationProperties(recognizedConfigurationProperties);

        super.connectorTypeBean = connectorType;

        /*
         * Set up the component description used in the connector's audit log messages.
         */
        AuditLogReportingComponent componentDescription = new AuditLogReportingComponent();

        componentDescription.setComponentId(connectorComponentId);
        componentDescription.setComponentName(connectorQualifiedName);
        componentDescription.setComponentDescription(connectorDescription);
        componentDescription.setComponentWikiURL(connectorWikiPage);

        super.setConnectorComponentDescription(componentDescription);
    }
}

Example: connector provider for the Kafka Monitor Integration Connector

For example, the KafkaMonitorIntegrationProvider is used to instantiate connectors that are monitoring an Apache Kafka broker. Therefore, its name and description refer to Kafka, and the connectors it instantiates are of type `KafkaMonitorIntegrationConnector .

Writing the connector

The connector extends the appropriate integration connector interface and has a default constructor:

public class MyIntegrationConnector extends FilesIntegratorConnector
{

    /**
     * Default constructor used by the connector provider.
     */
    public MyIntegrationConnector()
    {
        super();
    }

}

Accessing configuration properties and the endpoint

The connection object is stored in the connectionProperties instance variable defined by the super class. It is typically accessed in the start() method. The code snippet below accesses the endpoint and configuration properties from the connection. It also uses the audit log to record a start-up message

    /**
     * Indicates that the connector is completely configured and can begin processing.
     * This call can be used to register with non-blocking services.
     *
     * @throws ConnectorCheckedException there is a problem within the connector.
     */
    @Override
    public void start() throws ConnectorCheckedException
    {
        super.start();

        final String methodName = "start";

        /*
         * Extract the configuration
         */
        EndpointProperties  endpoint = connectionProperties.getEndpoint();

        if (endpoint != null)
        {
            myEndpoint = endpoint.getAddress();
        }

        Map<String, Object> configurationProperties = connectionProperties.getConfigurationProperties();

        if (configurationProperties != null)
        {
            :
        }

        /*
         * Record the configuration
         */
        if (auditLog != null)
        {
            auditLog.logMessage(methodName,
                                MyConnectorsAuditCode.CONNECTOR_CONFIGURATION.getMessageDefinition(connectorName,myEndpoint));
        }

        :
    }

Accessing context

The context is retrieved using the getContext() method. This is a synchronized method that can be called from multiple threads, that occurs when the connector is using listeners.

Registering a listener with open metadata

An integration connector that is listening for events from the open metadata ecosystem should implement the listener interface for the associated access service. This interface has a processEvent() method that your connector implements.

Integration Service Partner OMAS Listener Interface
API Integrator OMIS Data Manager OMAS DataManagerEventListener
Catalog Integrator OMIS Asset Manager OMAS AssetManagerEventListener
Database Integrator OMIS Data Manager OMAS DataManagerEventListener
Display Integrator OMIS Data Manager OMAS DataManagerEventListener
Files Integrator OMIS Data Manager OMAS DataManagerEventListener
Infrastructure Integrator OMIS IT infrastructure OMAS ITInfrastructureEventListener
Lineage Integrator OMIS Asset Manager OMAS AssetManagerEventListener
Organization Integrator OMIS Community Profile OMAS CommunityProfileEventListener
Security Integrator OMIS Security Manager OMAS SecurityManagerEventListener
Topic Integrator OMIS Data Manager OMAS DataManagerEventListener

Your integration connector registers itself as a listener in the start() method, and the processEvent() method is called each time an event occurs. The event type passed on processEvent() depends on the OMIS that the connector is using. In the example, the event type comes from Asset Manager OMAS so the connector is either using the Catalog Integrator OMAS or Lineage Integrator OMIS.

    /**
     * Indicates that the connector is completely configured and can begin processing.
     *
     * @throws ConnectorCheckedException there is a problem within the connector.
     */
    @Override
    public synchronized void start() throws ConnectorCheckedException
    {
        super.start();

        :
        :

        myContext = super.getContext();

        if (myContext != null)
        {
                myContext.registerListener(this);
        }
    }

     /**
      * Process an event that was published by the Asset Manager OMAS.  This connector is only interested in
      * glossaries, glossary categories and glossary terms.   The listener is only registered if metadata is flowing
      * from the open metadata ecosystem to Apache Atlas.
      *
      * @param event event object
      */
     @Override
     public void processEvent(AssetManagerOutTopicEvent event)
     {
        /*
         * Only process events if refresh() is not running because the refresh() process creates lots of events and
         * proceeding with event processing at this time causes elements to be processed multiple times.
         */
        if (! myContext.isRefreshInProgress())
        {
        ...
        }
     }
The isRefreshInProgress() call is used to ensure this connector ignores events while its refresh() is being called. For many connectors, many of the events created during this time are caused by the connector's own activity. Therefore, ignoring events at this time can avoid processing elements multiple times.

Working with the third party technology

Ideally your integration connector should use an embedded digital resource connector. This is configured as an embedded connection in the VirtualConnection used to configure the integration connector. When the integration connector is initialized, the embedded connections are used to create the embedded connectors. They can be accessed via the embeddedConnectors variable. As an example, here is the start() method from the OpenLineage Event Receiver Integration Connector which uses an embedded Open Metadata Topic Connector to access an event source:

    private final Map<String, OpenMetadataTopicConnector> topicConnectors = new HashMap<>();

    /**
     * Indicates that the connector is completely configured and can begin processing.
     * This call can be used to register with non-blocking services.
     *
     * @throws ConnectorCheckedException there is a problem within the connector.
     */
    @Override
    public void start() throws ConnectorCheckedException
    {
        super.start();

        final String methodName = "start";

        myContext = super.getContext();

        if (myContext != null)
        {
            if (embeddedConnectors != null)
            {
                for (Connector embeddedConnector : embeddedConnectors)
                {
                    if (embeddedConnector instanceof OpenMetadataTopicConnector)
                    {
                        /*
                         * Register this connector as a listener of the event bus connector.
                         */
                        OpenMetadataTopicConnector topicConnector = (OpenMetadataTopicConnector)embeddedConnector;
                        topicConnector.registerListener(this);

                        ConnectionProperties connectionProperties = topicConnector.getConnection();

                        if (connectionProperties != null)
                        {
                            EndpointProperties endpoint = connectionProperties.getEndpoint();

                            if (endpoint != null)
                            {
                                topicConnectors.put(endpoint.getAddress(), topicConnector);
                            }
                        }
                    }
                }
            }

            for (String topicName : topicConnectors.keySet())
            {
                OpenMetadataTopicConnector topicConnector = topicConnectors.get(topicName);
                ConnectionProperties       topicConnection = topicConnector.getConnection();

                /*
                 * Record the configuration
                 */
                if (auditLog != null)
                {
                    auditLog.logMessage(methodName,
                                        OpenLineageIntegrationConnectorAuditCode.KAFKA_RECEIVER_CONFIGURATION.getMessageDefinition(connectorName,
                                                                                                                                   topicName,
                                                                                                                                   topicConnection.getConnectionName()));
                }

                topicConnector.start();
            }
        }
    }
Notice that embeddedConnectors is a list, since multiple connectors can be embedded. The integration connector ignores any embedded connectors that are not OpenMetadataTopicConnectors.

The OpenLineage Event Receiver Integration Connector also demonstrates how the register a listener with an Open Metadata Topic Connector to receive events from an event broker such as Apache Kafka.

The integration connector implements OpenMetadataTopicListener.

public class OpenLineageEventReceiverIntegrationConnector extends LineageIntegratorConnector implements OpenMetadataTopicListener
{
    :

    /**
     * Method to pass an event received on topic.
     *
     * @param event inbound event
     */
    public void processEvent(String event)
    {
        :
    }
}
It then retrieves the embedded Open Metadata Topic Connector from the embeddedConnectors and calls registerListener(this) followed by in its start() method.

     topicConnector.registerListener(this);
     :
     topicConnector.start();
Once topicConnector.start() is called, the integration connector will receive events from Apache Kafka.

Do not create threads in your integration connector

Each integration connector runs in its own thread. Integration connectors should not create additional threads because this makes it difficult for Egeria to properly shut down the integration daemon independently of the OMAG Server Platform. If the connector needs to make blocking calls to the third party technology, it should implement the engage() method and set the usesBlockingCalls property in the integration daemon configuration to true. When the engage() method is called on the thread, it should issue one blocking call and return. The integration daemon will check that it is not in shutdown and if it is still running, it calls engage() again.

Exceptions and error handling

The methods of the integration connector are able to throw ConnectorCheckedException to indicate there is a problem. If your integration connector throws such an exception, the integration daemon switches it to FAILED status, and it is not called again until either the connector is restarted by the operator or the integration daemon is restarted. Therefore, when your integration connector discovers a problem, it can either just return from the method in the hope the problem is resolved by the next time it is called, or it can throw an exception. In either case it should log an audit log message. If the error needs an operator action to resolve it, throwing a ConnectorCheckedException exception means that the integration connector is not needlessly taking up resources when it can not operate. This is important if multiple failures are occurring and the ecosystem is under stress. However, throwing an exception for a temporary error that will resolve itself takes the integration connector offline unnecessarily and creates work for the operators.

The integration connector should only catch exceptions that inherit from java.lang.Exception since runtime exceptions are something that need to be handled by the broader runtime environment.

Audit log messages

Audit log messages help the people operating Egeria to be sure your integration connector is not being called too frequently and is able to access all the resources it needs. It is recommended that your integration connector outputs audit log messages in the following places:

  • At the end of the start() method to confirm the resources and options it has been configured with.
  • At the start and end of the refresh() method to show when it ran. It is helpful to summarise the number of updates made to open metadata or the third party technology, so it is possible to judge if it is being called at the right frequency.
  • At the end of the disconnect() method to confirm it has shutdown.
  • If the integration connector detects an error. This message should include the error information from the third party technology to aid diagnosis of the problem.

Testing your connector

Your integration connector implementation should be built and packaged in a jar file. This jar file contains your connector provider and connector implementation. It may optionally contain any dependent client libraries to the third party connector that are called directly by your integration connector. This is necessary if these client libraries are not available in their own jar file.

The connector jar file (and any jar files for the dependent third party client libraries not included in your connector's jar file) need to be added to the OMAG Server Platform class path. The easiest way to do this is to copy the JAR files into the extra directory of your OMAG Server Platform's assembly.

Once you have installed the connector, configure it in the integration daemon, connected to a metadata access store.

Figure 6

Your connector is then able to start and exchange metadata.

Figure 7

Documenting your connector

All connectors should be documented in some form of connector catalog to ensure they are easy for others to reuse. If your connector is either part of Egeria, or available from a public download, you may advertise it in Egeria's connector catalog.

Further information

Raise an issue or comment below