Creating and Consuming Jobs for Offloading creating-and-consuming-jobs-for-offloading

CAUTION
AEM 6.4 has reached the end of extended support and this documentation is no longer updated. For further details, see our technical support periods. Find the supported versions here.

The Apache Sling Discovery feature provides a Java API that enables you to create JobManager jobs and JobConsumer services that consume them.

For information about creating offloading topologies and configuring topic consumption, see Offloading Jobs.

Handling Job Payloads handling-job-payloads

The offloading framework defines two job properties that you use to identify the job payload. The offloading replication agents use these properties to identify the resources to replicate to the instances in the topology:

  • offloading.job.input.payload: A comma-separated list of content paths. The content is replicated to the instance that executes the job.
  • offloading.job.output.payload: A comma-separated list of content paths. When job execution is complete, the job payload is replicated to these paths on the instance that created the job.

Use the OffloadingJobProperties enum to refer to the property names:

  • OffloadingJobProperties.INPUT_PAYLOAD.propertyName()
  • OffloadingJobProperties.OUTPUT_PAYLOAD.propetyName()

Jobs do not require payloads. However, the payload is necessary if the job requires the manipulation of a resource and the job is offloaded to a computer that did not create the job.

Creating Jobs for Offloading creating-jobs-for-offloading

Create a client that calls the JobManager.addJob method to create a job that an automatically-selected JobConsumer executes. Provide the following information to create the job:

  • Topic: The job topic.
  • Name: (Optional)
  • Properties Map: A Map<String, Object> object that contains any number of properties, such as the input payload paths and output Payload paths. This Map object is available to the JobConsumer object that executes the job.

The following example service creates a job for a given topic and input payload path.

package com.adobe.example.offloading;

import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.apache.felix.scr.annotations.Reference;

import java.util.HashMap;

import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;

import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ResourceResolver;

import com.adobe.granite.offloading.api.OffloadingJobProperties;

@Component
@Service
public class JobGeneratorImpl implements JobGenerator  {

 @Reference
 private JobManager jobManager;
 @Reference ResourceResolverFactory resolverFactory;

 public String createJob(String topic, String payload) throws Exception {
  Job offloadingJob;

  ResourceResolver resolver = resolverFactory.getResourceResolver(null);
  if(resolver.getResource(payload)!=null){

   HashMap<String, Object> jobprops = new HashMap<String, Object>();
   jobprops.put(OffloadingJobProperties.INPUT_PAYLOAD.propertyName(), payload);

   offloadingJob = jobManager.addJob(topic, null, jobprops);
  } else {
   throw new Exception("Payload for job cannot be found");
  }
  if (offloadingJob == null){
   throw new Exception ("Offloading job could not be created");
  }
  return offloadingJob.getId();
 }
}

The log contains the following message when JobGeneratorImpl.createJob is called for the com/adobe/example/offloading topic and the /content/geometrixx/de/services payload:

10.06.2013 15:43:33.868 *INFO* [JobHandler: /etc/workflow/instances/2013-06-10/model_1554418768647484:/content/geometrixx/en/company] com.adobe.example.offloading.JobGeneratorImpl Received request to make job for topic com/adobe/example/offloading and payload /content/geometrixx/de/services

Developing a Job Consumer developing-a-job-consumer

To consume jobs, develop an OSGi service that implements the org.apache.sling.event.jobs.consumer.JobConsumer interface. Identify with the topic to consume using the JobConsumer.PROPERTY_TOPICS property.

The following example JobConsumer implementation registers with the com/adobe/example/offloading topic. The consumer simply sets the Consumed property of the payload content node to true.

package com.adobe.example.offloading;

import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jcr.Session;
import javax.jcr.Node;

import com.adobe.granite.offloading.api.OffloadingJobProperties;

@Component
@Service
public class MyJobConsumer implements JobConsumer {

 public static final String TOPIC = "com/adobe/example/offloading";

 @Property(value = TOPIC)
 static final String myTopic = JobConsumer.PROPERTY_TOPICS;

 @Reference
 private ResourceResolverFactory resolverFactory;

 @Reference
 private JobManager jobManager;

 private final Logger log = LoggerFactory.getLogger(getClass());

 public JobResult process(Job job) {
  JobResult result = JobResult.FAILED;
  String topic = job.getTopic();
  log.info("Consuming job of topic: {}", topic);
  String payloadIn =  (String) job.getProperty(OffloadingJobProperties.INPUT_PAYLOAD.propertyName());
  String payloadOut =  (String) job.getProperty(OffloadingJobProperties.OUTPUT_PAYLOAD.propertyName());

  log.info("Job has Input Payload {} and Output Payload {}",payloadIn, payloadOut);

  ResourceResolver resolver = null;
  try {
   resolver = resolverFactory.getAdministrativeResourceResolver(null);
   Session session = resolver.adaptTo(Session.class);
   Node inNode = session.getNode(payloadIn);
   inNode.getNode(Node.JCR_CONTENT).setProperty("consumed",true);
   result = JobResult.OK;
  }catch (Exception e){
   log.info("ERROR -- JOB RESULT IS FAILURE " + e.getMessage());
   result = JobResult.FAILED;
  }
  log.info("Job OK for payload {}",payloadIn);
  return result;
 }
}

The MyJobConsumer class generates the following log messages for an input payload of /content/geometrixx/de/services:

10.06.2013 16:02:40.803 *INFO* [pool-7-thread-17-<main queue>(com/adobe/example/offloading)] com.adobe.example.offloading.MyJobConsumer Consuming job of topic: com/adobe/example/offloading
10.06.2013 16:02:40.803 *INFO* [pool-7-thread-17-<main queue>(com/adobe/example/offloading)] com.adobe.example.offloading.MyJobConsumer Job has Input Payload /content/geometrixx/de/services and Output Payload /content/geometrixx/de/services
10.06.2013 16:02:40.884 *INFO* [pool-7-thread-17-<main queue>(com/adobe/example/offloading)] com.adobe.example.offloading.MyJobConsumer Job OK for payload /content/geometrixx/de/services

The Consumed property can be observed using CRXDE Lite:

chlimage_1-25

Maven Dependencies maven-dependencies

Add the following dependency defenitions to your pom.xml file so that Maven can resolve the Offloading-related classes.

<dependency>
   <groupId>org.apache.sling</groupId>
   <artifactId>org.apache.sling.event</artifactId>
   <version>3.1.5-R1485539</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>com.adobe.granite</groupId>
   <artifactId>com.adobe.granite.offloading.core</artifactId>
   <version>1.0.4</version>
   <scope>provided</scope>
</dependency>

The previous examples also required the following dependency definitions:

<dependency>
   <groupId>org.apache.sling</groupId>
   <artifactId>org.apache.sling.api</artifactId>
   <version>2.4.3-R1488084</version>
   <scope>provided</scope>
</dependency>

<dependency>
   <groupId>org.apache.sling</groupId>
   <artifactId>org.apache.sling.jcr.jcr-wrapper</artifactId>
   <version>2.0.0</version>
   <scope>provided</scope>
</dependency>
recommendation-more-help
2315f3f5-cb4a-4530-9999-30c8319c520e