ActivityPubObjectRepositoryDefault.java
package org.linkedopenactors.rdfpub.adapter.driven;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.Namespace;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.base.CoreDatatype.RDF;
import org.eclipse.rdf4j.model.util.Values;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryResults;
import org.eclipse.rdf4j.query.TupleQuery;
import org.eclipse.rdf4j.query.TupleQueryResult;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryResult;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.linkedopenactors.rdfpub.domain.Activity;
import org.linkedopenactors.rdfpub.domain.ActivityPubObject;
import org.linkedopenactors.rdfpub.domain.ActivityPubObjectRevision;
import org.linkedopenactors.rdfpub.domain.ActivityPubObjectRevisionFactory;
import org.linkedopenactors.rdfpub.domain.IRI;
import org.linkedopenactors.rdfpub.domain.ResourceFactory;
import org.linkedopenactors.rdfpub.domain.service.ActivityPubObjectRepository;
import org.linkedopenactors.rdfpub.domain.service.SparqlRepository;
import org.linkedopenactors.rdfpub.rdf4j.ModelToRdfObject;
import org.linkedopenactors.rdfpub.rdf4j.RdfObjectToModel;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ResponseStatusException;
import de.naturzukunft.rdf4j.utils.ModelLogger;
import de.naturzukunft.rdf4j.vocabulary.AS;
import de.naturzukunft.rdf4j.vocabulary.SCHEMA_ORG;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
class ActivityPubObjectRepositoryDefault implements ActivityPubObjectRepository, SparqlRepository{
private final Repository repository;
private final RdfObjectToModel rdfObjectToModel;
private final ModelToRdfObject modelToRdfObject;
private final ResourceFactory resourceFactory;
private final ActivityPubObjectRevisionFactory activityPubObjectRevisionFactory;
private final SparqlExecutor sparqlExecutor;
public ActivityPubObjectRepositoryDefault(Repository repository, RdfObjectToModel rdfObjectToModel, ModelToRdfObject modelToRdfObject,
ResourceFactory resourceFactory, ActivityPubObjectRevisionFactory activityPubObjectRevisionFactory,
SparqlExecutor sparqlExecutor) {
this.repository = repository;
this.rdfObjectToModel = rdfObjectToModel;
this.modelToRdfObject = modelToRdfObject;
this.resourceFactory = resourceFactory;
this.activityPubObjectRevisionFactory = activityPubObjectRevisionFactory;
this.sparqlExecutor = sparqlExecutor;
}
@Override
public Optional<ActivityPubObject> find(IRI subject, Set<IRI> contexts) {
log.trace("->readRdfObject(subject: "+subject+", contexts: "+contexts);
Optional<ActivityPubObject> result = Optional.empty();
try (RepositoryConnection con = repository.getConnection()) {
Resource[] array = toResourceArray(contexts);
Model resultModel = QueryResults.asModel(con.getStatements(Values.iri(subject.toString()), null, null, array));
if(!resultModel.isEmpty()) {
result = Optional.of(createActivityPubObject(subject, resultModel));
}
addContexts(result.orElse(null), resultModel);
copyNamespaces(result.orElse(null), con);
}
log.trace("<-readRdfObject("+result.toString()+")");
return result;
}
private void addContexts(ActivityPubObject result, Model resultModel) {
Optional<ActivityPubObject> resultOpt = Optional.ofNullable(result);
Set<IRI> contexts = StreamSupport.stream(resultModel.getStatements(null, null, null).spliterator(), false)
.map(stmt->stmt.getContext())
.map(Resource::stringValue)
.map(resourceFactory::iri)
.collect(Collectors.toSet());
resultOpt.ifPresent(e->e.addContext(contexts));
}
private void copyNamespaces(ActivityPubObject rdfObject, RepositoryConnection con) {
Optional<ActivityPubObject> rdfObjectOpt = Optional.ofNullable(rdfObject);
List<Namespace> namespaces = new ArrayList<>();
con.getNamespaces().forEach(ns->namespaces.add(ns));
rdfObjectOpt.ifPresent(r->{
namespaces.forEach(nns->r.addNamespace(nns.getPrefix(), resourceFactory.iri(nns.getName())));
});
}
private ActivityPubObject createActivityPubObject(IRI subject, Model resultModel) {
return modelToRdfObject.toActivityPubObject(Values.iri(subject.toString()), resultModel);
}
@Override
public void save(ActivityPubObject activityPubObject) {
saveObjects(List.of(activityPubObject));
}
@Override
public void save(ActivityPubObject activity, ActivityPubObjectRevision rev) {
saveObjects(rev.getActivityPubObjects());
save(activity);
}
@Override
public void save(Activity activity, ActivityPubObject object) {
save(activity);
save(object);
}
public void saveObjects(List<? extends ActivityPubObject> activityPubObjects) {
try (RepositoryConnection con = repository.getConnection()) {
con.begin();
try {
activityPubObjects.stream()
.sorted((o1, o2)->o1.getVersion().compareTo(o2.getVersion()))
.forEach(rdfObject -> saveObject(rdfObject, con));
con.commit();
} catch (Exception e) {
con.rollback();
log.error("error saving objects", e);
throw e;
}
}
}
public Optional<ActivityPubObject> findLatestByIdentifierAndTypes(String identifier, Set<IRI> type,
Set<IRI> contexts) {
try (RepositoryConnection con = repository.getConnection()) {
return findLatestVersion(identifier, type, contexts, con)
.flatMap(v->findSubject(v, identifier, type, contexts, con))
.flatMap(id->find(id, contexts));
}
}
public Set<ActivityPubObject> findLatestByIdentifier(String identifier, Set<IRI> contexts) {
try (RepositoryConnection con = repository.getConnection()) {
Optional<BigInteger> latest = findLatestVersion(identifier, Collections.emptySet(), contexts, con);
return latest.map(lv->{
return findSubjects(lv, identifier, contexts, con).stream()
.map(subject->find(subject, contexts, con))
.collect(Collectors.toSet());
}).orElse(new HashSet<>());
}
}
@Override
public Optional<ActivityPubObjectRevision> findByIdentifierAndVersion(String identifier, BigInteger version, Set<IRI> contexts) {
try (RepositoryConnection con = repository.getConnection()) {
ActivityPubObjectRevision activityPubObjectRevision = activityPubObjectRevisionFactory.create();
QueryResults.asModel(con.getStatements(null, SCHEMA_ORG.identifier, Values.literal(identifier), toResourceArray(contexts))).stream()
.map(Statement::getSubject)
.map(IRI.class::cast)
.map(sub->find(sub, contexts, con))
.filter(obj->obj.getVersion().equals(version))
.forEach(activityPubObjectRevision::add);
return activityPubObjectRevision.isEmpty() ? Optional.empty() : Optional.ofNullable(activityPubObjectRevision);
}
}
private ActivityPubObject find(IRI iri1, Set<IRI> contexts, RepositoryConnection con) {
org.eclipse.rdf4j.model.IRI rdf4jIri = Values.iri(iri1.toString());
Model m = QueryResults.asModel(con.getStatements(rdf4jIri, null, null, toResourceArray(contexts)));
return modelToRdfObject.toActivityPubObject(rdf4jIri, m);
}
private void saveObject(ActivityPubObject rdfObject, RepositoryConnection con) {
log.trace("->saveObject");
String identifier = rdfObject.getIdentifier();
BigInteger version = rdfObject.getVersion();
Optional<IRI> subjectOpt = findSubject(version, identifier, rdfObject.types(), rdfObject.contexts(), con);
if (subjectOpt.isPresent()) {
throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "object with version " + version + ", identifier: " + identifier + ", types: "+rdfObject.types()+", contexts: "+rdfObject.contexts()+" already exists!");
}
findLatestVersion(identifier, rdfObject.types(), rdfObject.contexts(), con).ifPresent(latest->{
BigInteger expected = latest.add(BigInteger.valueOf(1));
if(expected != version) {
String msg = "the latest version of "+rdfObject.getSubject()+" is "+latest+" the next expected version is " + expected + ", but you give me " + version;
log.error(msg + "\n" + rdfObject);
RepositoryResult<Statement> result = con.getStatements(Values.iri(rdfObject.getSubject().toString()), null, null);
ModelLogger.trace(log, QueryResults.asModel(result), "subject in database");
throw new IllegalStateException(msg);
}
});
Model model = rdfObjectToModel.convert(rdfObject);
Model filtered = model.filter(Values.iri(rdfObject.getSubject().toString()), null, null);
log.trace("saving " + filtered.size() + " statements. Identifier: " + identifier + "; version: " + version + ", types: " + rdfObject.types());
ModelLogger.trace(log, filtered, RDFFormat.TRIGSTAR, "saving ");
con.add(filtered, toResourceArray(rdfObject.contexts()));
filtered.getNamespaces().forEach(ns->con.setNamespace(ns.getPrefix(), ns.getName()));
}
private org.eclipse.rdf4j.model.Resource[] toResourceArray(Set<IRI> contexts) {
return contexts.stream()
.map(Object::toString)
.map(Values::iri)
.collect(Collectors.toSet())
.toArray(org.eclipse.rdf4j.model.IRI[]::new);
}
private String[] toStringArray(Set<IRI> contexts) {
return contexts.stream()
.map(Object::toString)
.collect(Collectors.toSet())
.toArray(String[]::new);
}
private Optional<IRI> findSubject(BigInteger version, String identifier, Set<IRI> types, Set<IRI> contexts, RepositoryConnection con) {
Set<IRI> subjects = findSubjectInternal(version, identifier, types, contexts, con);
if(subjects.size()>1) {
throw new IllegalStateException("more than one subject for version: " + version + ", identifier: " + identifier + ", types: " + types);
}
return subjects.stream().findFirst();
}
private Set<IRI> findSubjects(BigInteger version, String identifier, Set<IRI> contexts, RepositoryConnection con) {
return findSubjectInternal(version, identifier, Collections.emptySet(), contexts, con);
}
private Optional<BigInteger> findLatestVersion(String identifier, Set<IRI> types, Set<IRI> contexts, RepositoryConnection con) {
Set<BigInteger> versions = findVersions(identifier, types, contexts, con);
Optional<BigInteger> max = versions.stream().max(Comparator.naturalOrder());
log.trace("findLatestVersion("+identifier+","+types+"): " + max);
return max;
}
private Set<BigInteger> findVersions(String identifier, Set<IRI> types, Set<IRI> contexts, RepositoryConnection con) {
List<IRI> ctxList = new ArrayList<IRI>(contexts);
String from = "";
if(ctxList.size()>0) {
from = "";
String prefix = "";
for (int ctx = 0; ctx < ctxList.size(); ctx++) {
from += prefix;
from += "FROM <"+ctxList.get(ctx)+">";
prefix = "\n";
}
}
Set<BigInteger> versions = new HashSet<>();
StringBuilder queryStringBuilder = new StringBuilder();
queryStringBuilder.append(
"""
PREFIX %s: <%s>
PREFIX %s: <%s>
SELECT *
%s
WHERE {
""".formatted( AS.PREFIX, AS.NAMESPACE,
SCHEMA_ORG.PREFIX, SCHEMA_ORG.NAMESPACE, from));
queryStringBuilder.append(getTypesQueryPart(new ArrayList<>(types)));
queryStringBuilder.append("""
?subject schema:version ?version .
?subject <https://schema.org/identifier> "%s" .
}
""".formatted(identifier));
String queryString = queryStringBuilder.toString();
log.trace("findVersions(" + identifier + "," + types + "): \n" + queryString);
TupleQuery tupleQuery = con.prepareTupleQuery(queryString);
try (TupleQueryResult result = tupleQuery.evaluate()) {
log.trace("findVersions hasResult: " + result.hasNext());
while (result.hasNext()) { // iterate over the result
BindingSet bindingSet = result.next();
Value version = bindingSet.getValue("version");
if(version.isLiteral()) {
versions.add(((Literal)version).integerValue());
}
}
}
log.trace("findVersions(" + identifier + "," + types + "): " + versions);
return versions;
}
/**
* Finds subjects in the passed contexts from the passed types matching the passed identifier and version.
* @param version The <a href="https://schema.org/version">version</a> that is mandatory for each rdf-pub object. See {@link RdfObjectRevision}
* @param identifier The <a href="https://schema.org/identifier">identifier</a> that is mandatory for each rdf-pub object. See {@link RdfObjectRevision}
* @param types The {@link RDF#TYPE}s, that should be queried.
* @param contexts The <a href="https://rdf4j.org/documentation/programming/repository/#using-named-graphscontext">graphs</a> that contain the created {@link ActivityPubObject}.
* @param con {@link RepositoryConnection} to use for searching.
* @return {@link Set} of {@link IRI} with the subjects for the passed arguments.
*/
private Set<IRI> findSubjectInternal(BigInteger version, String identifier, Set<IRI> types, Set<IRI> contexts, RepositoryConnection con) {
Set<IRI> subjects = new HashSet<>();
String queryString = getFindSubjectQuery(version, identifier, types, contexts);
log.trace("findSubjectInternal queryString: " + queryString);
TupleQuery tupleQuery = con.prepareTupleQuery(queryString);
try (TupleQueryResult result = tupleQuery.evaluate()) {
log.trace("findSubject hastResult: " + result.hasNext());
while (result.hasNext()) { // iterate over the result
BindingSet bindingSet = result.next();
Value v = bindingSet.getValue("subject");
if (!v.isIRI()) {
throw new IllegalStateException("no IRI: " + v);
} else {
subjects.add(resourceFactory.iri(v.stringValue()));
}
}
}
log.trace("findSubjectInternal size: " + subjects.size());
return subjects;
}
/**
* Constructs a 'FindSubjectQuery'.
* @param version The <a href="https://schema.org/version">version</a> that is mandatory for each rdf-pub object. See {@link RdfObjectRevision}
* @param identifier The <a href="https://schema.org/identifier">identifier</a> that is mandatory for each rdf-pub object. See {@link RdfObjectRevision}
* @param types The {@link RDF#TYPE}s, that should be queried.
* @param contexts The <a href="https://rdf4j.org/documentation/programming/repository/#using-named-graphscontext">graphs</a> that contain the created {@link ActivityPubObject}.
* @return The 'FindSubjectQuery' as String.
*/
private String getFindSubjectQuery(BigInteger version, String identifier, Set<IRI> types, Set<IRI> contexts) {
List<IRI> ctxList = new ArrayList<IRI>(contexts);
String from = "";
if(ctxList.size()>0) {
from = "";
String prefix = "";
for (int ctx = 0; ctx < ctxList.size(); ctx++) {
from += prefix;
from += "FROM <"+ctxList.get(ctx)+">";
prefix = "\n";
}
}
StringBuilder queryStringBuilder = new StringBuilder();
queryStringBuilder.append(Prefix.OWL.value()).append("\n");
queryStringBuilder.append(Prefix.XML.value()).append("\n");
queryStringBuilder.append(Prefix.XSD.value()).append("\n");
queryStringBuilder.append(Prefix.RDFS.value()).append("\n");
queryStringBuilder.append(Prefix.AS.value()).append("\n");
queryStringBuilder.append(Prefix.RDF.value()).append("\n");
queryStringBuilder.append(Prefix.SCHEMA.value()).append("\n");
queryStringBuilder.append("SELECT * ").append("\n");
queryStringBuilder.append(from).append("\n");
queryStringBuilder.append("WHERE {").append("\n");
if(!types.isEmpty()) {
queryStringBuilder.append(getTypesQueryPart(new ArrayList<>(types)));
}
if(version!=null) {
queryStringBuilder.append(" ?subject schema:version %s .\n".formatted(version));
}
queryStringBuilder.append(" ?subject schema:identifier \"%s\" .".formatted(identifier));
queryStringBuilder.append("\n}");
String queryString = queryStringBuilder.toString();
log.trace("getFindSubjectQuery: \n" + queryString);
return queryString;
}
private String getTypesQueryPart(List<IRI> typesList) {
StringBuilder queryStringBuilder = new StringBuilder();
for (int i = 0; i < typesList.size(); i++) {
IRI type = typesList.get(i);
if(i==0) {
queryStringBuilder.append(" ?subject rdf:type <%s> ".formatted(type.toString()));
} else {
queryStringBuilder.append(" rdf:type <%s> ".formatted(type.toString()));
}
if (i==typesList.size()-1) {
queryStringBuilder.append(".\n");
} else {
queryStringBuilder.append(";\n");
}
}
return queryStringBuilder.toString();
}
@Override
public String dump(Set<IRI> contexts) {
try (RepositoryConnection con = repository.getConnection()) {
Model all = QueryResults.asModel(con.getStatements(null, null, null, toResourceArray(contexts)));
return ModelLogger.toString(all, RDFFormat.TRIGSTAR);
}
}
public String dump() {
try (RepositoryConnection con = repository.getConnection()) {
Model all = QueryResults.asModel(con.getStatements(null, null, null));
return ModelLogger.toString(all, RDFFormat.TRIGSTAR);
}
}
@Override
public OutputStream executeSparql(Set<IRI> contexts, String sparqlQuery, String headerAccept) throws IOException {
return sparqlExecutor.executeSparql(toStringArray(contexts), sparqlQuery, headerAccept);
}
}