Platform Cache

Introduction

In this blog post, I am going to explain “Platform Cache ” and how you are going to use. You may use many ways to improve your pages to run quickly by using custom settings and view states reduce technicians and so on.  But with new platform cache, you can store Salesforce session and org data for later access and applications can run faster because they store reusable data in memory.

How does Platform Cache work?

Platform Cache uses local cache and a least recently used (LRU) algorithm to improve performance.The local cache is the application server’s in-memory container that the client interacts with during a request. Cache operations don’t interact with the caching layer directly but instead interact with the local cache.For session cache, all cached items are loaded into local cache upon the first request. All subsequent interactions use the local cache. Similarly, an org cache gets operation retrieves a value from the caching layer and stores it in the local cache. Subsequent requests for this value are retrieved from the local cache. Platform Cache uses an LRU algorithm to evict keys from the cache. When cache limits are reached, keys are evicted until the cache is reduced to 100-percent capacity. If session cache is used, the system removes cache evenly from all existing session cache instances. The local cache also uses an LRU algorithm. When the maximum local cache size for a partition is reached, the least recently used items are evicted from the local cache

 Types Of Platform Cache: –

Platform cache supports two types of caches

  • Session cache—Stores data for individual user sessions. For example, in an app that finds customers within specified territories, the calculations that run while users browse different locations on a map are reused.

    Session cache lives alongside a user session. The maximum life of a session is eight hours. Session cache expires when its specified time-to-live (ttlsecs value) is reached or when the session expires after eight hours, whichever comes first.

  • Org cache—Stores data that any user in an org reuses. For example, the contents of navigation bars that dynamically display menu items based on user profile are reused.

    Unlike session cache, the org cache is accessible across sessions, requests, and org users and profiles. Org cache expires when its specified time-to-live (ttlsecs value) is reached.

Distribute the cache with Partitions

Partitions allow you to improve the performance by distributing cache space in the way that works best for your applications. after setting up the partitions you can add, access, and remove data from them using the Platform Cache Apex API.In order to use Platform Cache, create at least one partition. Each partition has one session cache and one org cache segment and you can allocate separate capacity to each segment. Session cache can be used to store data for individual user sessions, and the org cache is for data that any users in an org can access. You can distribute your org’s cache space across any number of partitions. Session and org cache allocations can be zero, or five or greater, and they must be whole numbers. The sum of all partition allocations, including the default partition, equals the Platform Cache total allocation. The total allocated capacity of all cache segments must be less than or equal to the org’s overall capacity.

After you set up partitions, you can use Apex code to perform cache operations on a partition. For example, use the Cache.SessionPartition and Cache.OrgPartition classes to put, retrieve, or remove values from a specific partition’s cache. Use Cache.Session and Cache.Org to get a partition or perform cache operations by using a fully qualified key.

To access the Partition tool in Setup, enter Platform Cache in the Quick Find box, then select Platform Cache. Click new Platform Cache Partition . each Partition should have a session Cache and org Cache. enter Partition name and label as “partition”

PC1

 

Handling Session cache

Use the Cache.Session and Cache.SessionPartition classes to manage values in the session cache. To manage values in any partition, use the methods in the Cache.Session class. If you’re managing cache values in one partition, use the Cache.SessionPartition methods instead.

// Add a value to the cache . local is the default name space cache
Cache.Session.put('local.partition.key', '1234567');
if (Cache.Session.contains('local.partition.key')) {
String key = (String)Cache.Session.get('local.partition.key');
}
// Dafualt cache paritions
Cache.Session.put('key', '123456');
if (Cache.Session.contains('key')) {
String key = (String)Cache.Session.get('key');
}

// Get a cached value
String val = (String)Cache.Session.get('local.partition.key');

 

If you’re managing cache values in one partition, use the Cache.SessionPartition methods instead. After the partition object is obtained, the process of adding and retrieving cache values is similar to using the Cache.Session methods. The Cache.SessionPartition methods are easier to use because you specify only the key name without the namespace and partition prefix.

// Get partition
Cache.SessionPartition sessionPart = Cache.Session.getPartition('local.Partition');
// Retrieve cache value from the partition
if (sessionPart.contains('key')) {
String cachedTitle = (String)sessionPart.get('key');
}
// Add cache value to the partition
sessionPart.put('value', 'welcome');

 

 

Handling Org Cache

Use the Cache.Org and Cache.OrgPartition classes to manage values in the org cache. To manage values in any partition, use the methods in the Cache.Org class. If you’re managing cache values in one partition, use the Cache.OrgPartitionmethods instead.

// Add a value to the cache

Cache.Org.put('local.partition.key', 'Hello ');
if (Cache.Org.contains('local.partition.key')) {
String key = (String)Cache.Org.get('local.partition.key');
}

 

If you’re managing cache values in one partition, use the Cache.OrgPartition methods instead. After the partition object is obtained, the process of adding and retrieving cache values is similar to using the Cache.Org methods. The Cache.OrgPartition methods are easier to use because you specify only the key name without the namespace and partition prefix.

// Get partition
Cache.OrgPartition orgPart = Cache.Org.getPartition('local.partition');
// Retrieve cache value from the partition
if (orgPart.contains('key')) {
String key = (String)orgPart.get('key');
}
// Add cache value to the partition
orgPart.put('value','welcome');

 

Diagnose Platform cache

Cache diagnoses will provide the information about how much cache is used.The Diagnostics page provides valuable information, including the capacity usage, keys, and serialized and compressed sizes of the cached items. The session cache and org cache have separate diagnostics pages. The session cache diagnostics are per session, and they don’t provide insight across all active sessions.

Here is the simple code that used to store and retrieve the conversion exchange rates from the web services. Simple it checks the key is present in the Cache, it is going to retrieve the values from the platform chance otherwise it will store the values in the cache. so that you no need to make Webservice call even time to get the real time conversion rates

public class PlatformCacheController {
    Cache.OrgPartition orgPart ;
    
    public PlatformCacheController(){
        orgPart  = Cache.Org.getPartition('local.partition');
        //http://apilayer.net/api/live?access_key=456fab5d3bee967f81169416e234387e&currencies=EUR,GBP,CAD,PLN&source=USD&format=1        
    }
    
    public String fetchData(String fromCurrency , String toCurrency ){
        String keytoStoreorRet =fromCurrency+toCurrency;
        If(checkKeyInCache(keytoStoreorRet)){
            return (String) orgPart.get(keytoStoreorRet);
        }else{
            HttpRequest req = new HttpRequest();
            req.setEndpoint('http://apilayer.net/api/live?access_key=456fab5d3bee967f81169416e234387e&currencies='+toCurrency+'&source='+fromCurrency+'&format=1');
            req.setMethod('GET') ; 
            Http h = new Http();
            HttpResponse resp =  h.send(req) ; 
            
            orgPart.put(keytoStoreorRet , resp.getBodyAsBlob());
            return resp.getBody() ;
        }
        
    }
    
    
    public void updateKeyinCache(String key ,String values){
        if(!checkKeyInCache(key)){
            orgPart.put(key, values);
        }
    }
    public boolean checkKeyInCache(String key){
        if (orgPart.contains(key)) {
            return true ; 
        }else{
            return false ; 
        }
        
    }
    
}

 

Considerations of platform cache and best practices 

  • Cache isn’t persisted. and there is no guaranty on data lost.Make sure you should handle the data loss properly. You can use CacheBuilder to handle the data losses.
  • Decided what type of data access you need like Concurrent vs serial retrieve and update. Org cache supports concurrent reads and writes across multiple simultaneous Apex transactions
  • think how to handle cache misses like Using CacheBuiler or you can use your own retrieval or update cache handling
  • Not all the data need to be stored in the cache. Including more data in the cache may impact performance. In case if you need to store the bulk data, split and store into multiple keys
  • Use the cache to store static data or data that doesn’t change often rather than changing the data very often

 

 

 

 

 

 

 

Advertisement

Node JS Streaming API examples

In this blog, I am going to explain the how to test the node js application with Salesforce Streaming API. Here I am going to use the nforce, express, and socket.io along with ejs views. Please refer this link to understand the streaming API .Salesforce Streaming API

Step1: – I am assuming you already created a push topic in Salesforce as per the above link.

Step 2: Create a Connect App for authentication from App menu as shown below

Capture

Step 3: – Clone the Complete Repository from the git hub. here is the link
https://github.com/rajamohanvakati/Node-Js-Steaming-
Open auth.js file from Auth folder and update the details as shown below .

exports.PORT = 3001;
exports.DEBUG = true
exports.ENVIRONMENT = ‘production’;
exports.CALLBACK_URL = ‘http://localhost:3001’;
exports.PUSH_TOPIC = ‘OpportunityChannel’;
exports.CLIENT_ID =””;
exports.CLIENT_SECRET = “”;
exports.USERNAME = “”;
exports.PASSWORD = “”

Here is the index.js file that contains the logic to connect the Salesforce push topic and establish the socket connection. Once Streaming API topic is receiving any message then socket.io emit that message to index.ejs where we are showing the push notification. This is completed index.js

var express = require('express');
var nforce = require('nforce');
var path = require('path');
var app = express();

var server = require('http').Server(app);
// attach socket.io and listen
var io = require('socket.io')(server);

var config = require('./Auth/auth.js');
var sfConn = nforce.createConnection({
  clientId: config.CLIENT_ID, //Connected app clientId
  clientSecret: config.CLIENT_SECRET, // Connected app clientSecret
  redirectUri: config.CALLBACK_URL + '/oauth/_callback', // call back URL
  environment: config.ENVIRONMENT // optional, sandbox or production, production default
});
sfConn.authenticate({
  username: config.USERNAME, //  salesforce User name
  password: config.PASSWORD // Salesforce password
}, function(error, oauth) {
  if (error) return console.log(error);
  if (!error) {
    console.log('*** Successfully connected to Salesforce ***');
  }
  var streamingConnect = sfConn.stream({
    topic: config.PUSH_TOPIC,
    oauth: oauth
  });
  streamingConnect.on('connect', function() {
    console.log('Connected to pushtopic: ' + config.PUSH_TOPIC);
  });
  streamingConnect.on('error', function(error) {
    console.log('Error received from pushtopic: ' + error);
  });
  streamingConnect.on('data', function(data) {
    console.log('Received the following from pushtopic:');
    console.log(data);
    io.sockets.emit('records', data);
    console.log('after sent to emilt');

  });
});
app.set('port', process.env.PORT || 3001);
app.set('views', path.join(__dirname, 'views'));
app.set('view engine', 'ejs');
app.get('/', function(req, res) {
  res.render('index');

})

server.listen(app.get('port'), function() {
  console.log('Express server listening on port %d in %s mode', app.get(
    'port'), app.get('env'));
});

In index.js once your receive the notification, you can broadcasting it by using socket emit method as show below

streamingConnect.on('data', function(data) {
    console.log('Received the following from pushtopic:');
    console.log(data);
   <strong> io.sockets.emit('records', data);</strong>
    console.log('after sent to emilt');

  });

The emitted message is reviving in views as show below


var socket = io(url);
socket.on('records', function (data) {
console.log(data);
console.log(data.sobject );
console.log(data.sobject.Id );

//var results = JSON.parse(data.sobject);
var streamList = $('ul.streams');

streamList.prepend('<br/> <li>' +
  data.sobject.Name + ' : ' + data.sobject.StageName +
   data.sobject.Amount + ': ' + data.sobject.ExpectedRevenue +'</li>');
});

Now you can run the app by simply two commands

npm install

node index.js

after that you can open “http://localhost:3001/ ” in the browser you will see the notification on the browser

Capture

Step 4: Pushing it to Heroku
you can push this app to Heroku with simple steps
Heroku login
Heroku create
git push heroku master
heroku ps:scale web=1
heroku open

 

 

 

 

 

 

 

 

 

 

 

 

Sales force Streaming API

In this blog post i am going to explain how to set up the sales force streaming basic concepts and how to setup the streaming API .With the sales force Streaming API client  application ( It may be sales force it self or third party application ) can receive the near real time data updates with out refreshing or reloading the applications based on the push topic created . Steaming API use “Push notification” technology that allow you to send the notification to client without client request which is opposite to the pull technology .  A streaming API differs from the normal REST API in the way that it leaves the HTTP connection open for as long as possible(i.e. “persistent connection  or Long polling “). It pushes data to the client as and when it’s available and there is no need for the client to poll the requests to the server for newer data. This approach of maintaining a persistent connection reduces the network latency significantly when a server produces continuous stream of data like say, today’s social media channels.

How Streaming API work’s ? 

Streaming API is implemented in the CometD framework which holds the Bayeux protocol created for providing an asynchronous message(AJAX style )  by HTTP using long polling connections which is wide open . the basic life cycle of streaming API is as follows

  1. Client makes an initial call  to server and establish the handshake with server.
  2. After establishing the handshake client can be subscribed for the streaming channel
  3. client listens to that event using long polling . Server defers the response to call until new information is available or until a particular status or the call timed out.
  4. Whenever new information is available, server sends back the data to client as response. Clint will consume the response and the connection go to the idea state after receive the response .
  5.  Server returns to step 3

Push Technology
Push technology also called publish/subscribe model, transfers information that is initiated from a server to the client . push technology is the  asynchronous communication between a server and client  . In push technology, the server pushes out information to the client after the client has subscribed to a channel of information. The server-client connection always remains open, so that when another event occurs the data is immediately sent to the client without refreshing or reloading the apps .

Bayeux Protocol
Bayeux is a JSON-based protocol which is more flexible and scalable to transfer asynchronous message with low latency. The messages are routed via named channels. Server-push technology is used to deliver asynchronous messages from server to client.

CometD
CometD is a scalable HTTP-based event routing bus that uses an AJAX Push technology pattern known as Comet . It implements the Bayeux protocol.

Long Polling
Long polling is a technique in which the client makes an Ajax request to the server, and it is kept open until the server has new data to send to the clients . Upon receiving the server response, the clients initiates a new long polling request in order to obtain the next data is available .

Making your Org Ready :- 

Please make sure you have below permission to to use streaming API .

1) The “Streaming API” permission must be enabled -> “Your Name > Setup > Customize > User Interface”
2) The logged-in user must have “Read” permission on the PushTopic standard object to receive notifications.
3) The logged-in user must have “Create” permission on the PushTopic standard object to create and manage PushTopic records.

Setting Up Streaming API in salesforce :- 

Step 1 :- PushTopic 
Creating a push topic is simple . you need to create a PushTopic object records how similarly  you are inserting account or other standard object records .A PushTopic enables you to define the object, fields, and criteria you’re  interested in receiving event notifications for in near real time . “PushTopic” is the Object API name  and required field for creating a record are Name, Query  and ApiVersion.

Go to developer console and execute the below code from Execute anonymous window


PushTopic pushTopic = new PushTopic();
pushTopic.Name = 'OpportunityChannel';
pushTopic.Query = 'SELECT Id, Name,Amount, StageName ,CloseDate,ExpectedRevenue FROM Opportunity where StageName!=\'Closed Lost\'';
pushTopic.ApiVersion = 39.0;
pushTopic.NotifyForOperationCreate = true;
pushTopic.NotifyForOperationUpdate = true;
pushTopic.NotifyForOperationUndelete = true;
pushTopic.NotifyForOperationDelete = true;
pushTopic.NotifyForFields = 'Referenced';
insert pushTopic;

 

  • Name – Name of the PushTopic Channel
  • API Version – API version of Push topic
  • Query, which holds a string representation of a SOQL query
  • notifyForOperationCreate, if true insert DML  calls will trigger a push event
  • notifyForOperationUpdate, if true update DML calls will trigger a push event
  • notifyForOperationDelete, if true delete DML calls will trigger a push event
  • notifyForOperationUndelete, if true undelete DML calls will trigger a push event

PushTopic evaluation is based on the query you specified for Push topic objects .In our case change to the field Name,Amount, Stage Name ,Close Date,Expected Revenue on opportunity would cause Push topic  to execute .If the record changes match the criteria of the PushTopic query, a notification is generated by the server and received by the subscribed clients.

The NotifyForFields attribute of the Pushtopic is responsible for the evaluation of the fields. The following settings are possible:

  1. All: Notifications are generated for all record field changes, provided the values of the fields referenced in the WHERE clause match the values specified in the WHERE clause.
  2. Referenced (default): Changes to fields referenced in both the SELECT clause and WHERE clause are evaluated. Notifications are generated for all records where a field referenced in the SELECT clause changes or a field referenced in the WHERE clause changes and the values of the fields referenced in the WHERE clause match the values specified in the WHERE clause.
  3. Select: Changes to fields referenced in the SELECT clause are evaluated. Notifications are generated for all records where a field referenced in the SELECT clause changes and the values of the fields referenced in the WHERE clause match the values specified in the WHERE clause.
  4. Where: Changes to fields referenced in the WHERE clause are evaluated. Notifications are generated for all records where a field referenced in the WHERE clause changes and the values of the fields referenced in the WHERE clause match the values specified in the WHERE clause.

Step 2:-Static Resource

Upload the the following  java script libraries to salesforce static resources .  you can download from ‘https://download.cometd.org/’ link .

  • cometd-<version>/cometd-javascript/common/target/org/cometd.js
  • cometd-<version>/cometd-javascript/jquery/src/main/webapp/jquery/jquery-1.5.1.js
  • cometd-<version>/cometd-javascript/jquery/src/main/webapp/jquery/json2.js
  • cometd-<version>/cometd-javascript/jquery/src/main/webapp/jquery/jquery.cometd.js

Step 3:- Create client  (Lets Take a simple Visual force page) 

we are assuming that in this case , visual force page from the same org is the Client . In other cases you may think of having third party applications .Code is shown here below for visual force page .

 

<apex:page standardStylesheets=”false” showHeader=”false” sidebar=”false”>

<apex:includeScript value=”{!$Resource.cometd}”/>
<apex:includeScript value=”{!$Resource.jquery}”/>
<apex:includeScript value=”{!$Resource.json2}”/>
<apex:includeScript value=”{!$Resource.jquery_cometd}”/>

(function($){
$(document).ready(function() {
$.cometd.configure({
url: window.location.protocol+’//’+window.location.hostname+ (null != window.location.port ? (‘:’+window.location.port) : ”) +’/cometd/40.0/’,
requestHeaders: { Authorization: ‘OAuth {!$Api.Session_ID}’}
});
$.cometd.handshake();
$.cometd.addListener(‘/meta/handshake’, function(message) {
$.cometd.subscribe(‘/topic/OpportunityChannel’, function(message) {
var div = document.getElementById(‘content’);
div.innerHTML = div.innerHTML + ‘

Notification

‘ +
‘Streaming Message ‘ + JSON.stringify(message) + ‘
‘;
});
});

});
})(jQuery)

</apex:page>

Step 4:- Testing 

Now you can go and preview the visual force page . on the other window start creating or updating the opportunities . You can see the real time notification to the page as show below

Capture 3

 

 Limits :-

  • The maximum size of the HTTP request post body that the server can accept from the client is 32,768 bytes, for example, when you call the CometD subscribe or connect methods. If the request message exceeds this size, the following error is returned in the response: 413 Maximum Request Size Exceeded.
  • If the client receives events, it should reconnect immediately to receive the next set of events. If the reconnection doesn’t occur within 40 seconds, the server expires the subscription and the connection closes. The client must start over with a handshake and subscribe again.
  • If no events are generated and the client is waiting and the server closes the connection, after two minutes the client should reconnect immediately.
  • The SELECT statement’s field list must include Id
  • You can query from one object
  • aggregate queries or semi-joins aren’t supported.
  • All custom objects are supported in PushTopic queries. The following subset of standard objects are supported in PushTopic queries:Account, Campaign, Case, Contact, Lead, Opportunity, Task. The following standard objects are supported in PushTopic queries through a pilot program: ContractLineItem, Entitlement, LiveChatTranscript, Quote, QuoteLineItem, ServiceContract.

Platform Events in Salesforce

Introduction

In this blog post, I am going to explain about platform events a new feature generally available from Summer ’17 release as part of the “Enterprise Message Platform” which provide event driven architecture.

Let’s talk about Event Driven Architecture 

Salesforce event-driven architecture is consists of event producers, event consumers, and channels.  Platform events simplify the process of communicating changes and responding to events. Publishers and subscribers communicate with each other through events. One or more subscribers can listen to the same event and carry out actions .with an Event-driven architecture each service publishes an event whenever it updates or creates a data. Other services can subscribe to events.It enables an application to maintain data consistency across multiple services without using distributed transactions.  Let us take an example of order management. When the Order management app creates an Order in a pending state and publishes an OrderCreated event.The Customer Service receives the event and attempts to process an Order. It then publishes an OrderUpdate event.Then OrderUpdate Service receives the event from the changes the state of the order to either approved or canceled or fulfilled.The following  diagram show the event driven architect

Capture

Terminology 

Event
A change in state that is meaningful in a business process. For example, a placement o of an order is a meaningful event because the order fulfillment center requires notification to process the order.
Event Notifier 
A message that contains data about the event. Also known as an event notification.
Event producer
The publisher of an event message over a channel.
Channel
A conduit in which an event producer transmits a message. Event consumers subscribe to the channel to receive messages.
Event consumer
A subscriber to a channel that receives messages from the channel. A change in state that is meaningful in a business process.

Looks like Streaming API, But Really not 

But when you overlook at Platform events it makes similar to Streaming API and most of the futures including the replayID and durability but below makes the difference between with streaming API.

  • Platform  events are special kinds of entity similar to custom object custom object
  • You can publish and consume platform events by using Apex or a REST API or SOAP API. Platform events integrate with the Salesforce platform through Apex triggers. Triggers are the event consumers on the Salesforce platform that listen to event messages.Unlike custom objects, you can’t update or delete event records. You also can’t view event records in the Salesforce user interface, and platform events don’t have page layouts. When you delete a platform event definition, it’s permanently deleted.
  • Platform events may be published using declarative tools (Process Builder)
  • platform events can also be subscribed to using APEX  or decoratively process builder  and flows

Another major,off-course really impressive one is you can publish changes from apex trigger and you can consume from apex trigger trigger

Publishing and subscribing Platform events 

Publishing and subscribing the platform event are more flexible. You can publish event messages from a Force.com app or an external app using Apex or Salesforce APIs and you can subscribe from the Salesforce or external apps or use long polling with cometD as well.

Let’s take an Example:- 

Now I am going to explain step by step to set up, publish and consume events. What we are going to do it Employee Onboarding Process. Now Once an external app publishes the events, we are going to create an account and when Salesforce publish onboarding events another system is going to receive the platform events.

1: – Define a Platform event


You can define platform event similar like custom object, go to setup –> develope –> Platform events –> create new platform events as shown below.

Capture

By seeing it looks like custom objects but here are the few major considerations.

  • Platform event is appended the __e suffix for API name of the event.
  • you can’t query Platform events through SOQL or SOSL.
  •  you can’t use Platform in reports, list views, and search.
  •  published platform events can’t be rolled back.
  • e methods aren’t supported with platform events.
  • All platform event fields are read-only by default
  • Platform events don’t have an associated tab
  • Only after insert Triggers Are Supported
  • You can access platform events both through API and declaratively
  • You can control platform events though Profiles and permissions

2: – Publishing Platform Events

You can publish events using an Apex method or with declarative tools, such as Process Builder or the Cloud Flow Designer or you can publish events using Salesforce API. we are going to seeing all the ways how to publish the platform events.

Publish Using Apex

you can publish platform events by using apex trigger or execute anonymously and batch Apex etc.But here I am going to publish by using Apex triggers. A trigger processes platform event notifications sequentially in the order they’re received and trigger runs in its own process asynchronously and isn’t part of the transaction that published the event.

trigger PlatformEventPublish on Account (after insert , after update ) {
    
    If(trigger.isAfter && trigger.isUpdate){
        List<Employee_On_boarding__e> publishEvents = new List<Employee_On_boarding__e>();
        for(Account a : Trigger.new){
            Employee_On_boarding__e eve = new Employee_On_boarding__e();
            eve.Name__c = a.Name ; 
            eve.Phone__c = a.Phone ; 
            eve.Salary__c = a.AnnualRevenue ; 
            publishEvents.add(eve);            
        }
        if(publishEvents.size()>0){
            EventBus.publish(publishEvents);
        }
        
    }
    
}

 

 Now if you can see, Salesforce has a special class to publish the platform events EventBus which is having methods publish method. once the event is published you can consume the events from the channel

Publish Using Process Builder 

You can publish platform events using the declarative tools like process builders and flows. Here is the image shows the platform events insert by using process builder.

Capture

Publish Events by Using API

 

Now I am going to see another way of publishing events from API. I am going to use the workbench to publish the events.
Capture

3: – Subscribe for Platform events from the channel

You can now subscribe to the platform events from the Platform events object trigger which is created in step 1. Here is the sample trigger show how you can handle the subscribed events. Here simply I am creating new accounts from the platform even but you can implement your own business logic to update the data .

trigger OnBoaringTrigger on Employee_On_boarding__e (after insert) {
    List<Account> acc = new List<Account>();
    for(Employee_On_boarding__e oBording :trigger.new){
        acc.add(new Account(Name =oBording.Name__c , Phone =oBording.Phone__c , AnnualRevenue = oBording.Salary__c));
    }
    if(acc.size() >0){
        insert acc ;
    }
}

 

Here is the simple visual force page that consumes the platform events which you published. This page is built on cometD.
you can consume the platform events by using this  URI /event/Employee_On_boarding__e and the Complete code is here below.

<apex:page standardStylesheets="false" showHeader="false" sidebar="false">
    <div id="content">
        
    </div>
    <apex:includeScript value="{!$Resource.cometd}"/>
    <apex:includeScript value="{!$Resource.jquery}"/>
    <apex:includeScript value="{!$Resource.json2}"/>
    <apex:includeScript value="{!$Resource.jquery_cometd}"/>
    
    <script type="text/javascript">
    (function($){
        $(document).ready(function() {
            $.cometd.configure({
                url: window.location.protocol+'//'+window.location.hostname+ (null != window.location.port ? (':'+window.location.port) : '') +'/cometd/40.0/',
                requestHeaders: { Authorization: 'OAuth {!$Api.Session_ID}'}
            });
            $.cometd.handshake();
            $.cometd.addListener('/meta/handshake', function(message) {
                $.cometd.subscribe('/event/Employee_On_boarding__e', function(message) {
                    var div = document.getElementById('content');
                    
                    div.innerHTML = div.innerHTML + '<p>Notification </p><br/>' +
                        'Streaming Message ' + JSON.stringify(message) + '</p><br>';
                });
            });
            
        });
    })(jQuery)
    </script>
</apex:page>

Key points


1 . Platform events are executed under ” Automated Process entity.” So you have set Automated Process in debug logs
2. You can control the Platform events on Profile and permission sets
3.You can see all the platform events that are subscribed under Platform events objects.
Capture
4. Platform events have lifecycles state like Running, Ideal, Suspended,, Error, Expired
5.Platform events are having retry mechanism.

if (EventBus.TriggerContext.currentContext().retries < 4) {
// Condition isn't met, so try again later.
throw new EventBus.RetryableException(
'Condition is not met, so retrying the trigger again.');
} else {
// Trigger was retried enough times so give up and
// resort to alternative action.
// For example, send email to user.
}