ActivityPubObjectRepositoryDefault.java

package org.linkedopenactors.rdfpub.adapter.driven;

import java.io.IOException;
import java.io.OutputStream;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.Namespace;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.base.CoreDatatype.RDF;
import org.eclipse.rdf4j.model.util.Values;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryResults;
import org.eclipse.rdf4j.query.TupleQuery;
import org.eclipse.rdf4j.query.TupleQueryResult;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryResult;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.linkedopenactors.rdfpub.domain.Activity;
import org.linkedopenactors.rdfpub.domain.ActivityPubObject;
import org.linkedopenactors.rdfpub.domain.ActivityPubObjectRevision;
import org.linkedopenactors.rdfpub.domain.ActivityPubObjectRevisionFactory;
import org.linkedopenactors.rdfpub.domain.IRI;
import org.linkedopenactors.rdfpub.domain.ResourceFactory;
import org.linkedopenactors.rdfpub.domain.service.ActivityPubObjectRepository;
import org.linkedopenactors.rdfpub.domain.service.SparqlRepository;
import org.linkedopenactors.rdfpub.rdf4j.ModelToRdfObject;
import org.linkedopenactors.rdfpub.rdf4j.RdfObjectToModel;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ResponseStatusException;

import de.naturzukunft.rdf4j.utils.ModelLogger;
import de.naturzukunft.rdf4j.vocabulary.AS;
import de.naturzukunft.rdf4j.vocabulary.SCHEMA_ORG;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
class ActivityPubObjectRepositoryDefault implements ActivityPubObjectRepository, SparqlRepository{

	private final Repository repository;
	private final RdfObjectToModel rdfObjectToModel;
	private final ModelToRdfObject modelToRdfObject;
	private final ResourceFactory resourceFactory;
	private final ActivityPubObjectRevisionFactory activityPubObjectRevisionFactory;
	private final SparqlExecutor sparqlExecutor;	  
	
	public ActivityPubObjectRepositoryDefault(Repository repository, RdfObjectToModel rdfObjectToModel, ModelToRdfObject modelToRdfObject,
			ResourceFactory resourceFactory, ActivityPubObjectRevisionFactory activityPubObjectRevisionFactory,
			SparqlExecutor sparqlExecutor) {
		this.repository = repository;
		this.rdfObjectToModel = rdfObjectToModel;
		this.modelToRdfObject = modelToRdfObject;
		this.resourceFactory = resourceFactory;
		this.activityPubObjectRevisionFactory = activityPubObjectRevisionFactory;
		this.sparqlExecutor = sparqlExecutor;
	}
	
	@Override
	public Optional<ActivityPubObject> find(IRI subject, Set<IRI> contexts) {
    	log.trace("->readRdfObject(subject: "+subject+", contexts: "+contexts);
    	Optional<ActivityPubObject> result = Optional.empty();
    	try (RepositoryConnection con = repository.getConnection()) {        
    		Resource[] array = toResourceArray(contexts);
			Model resultModel = QueryResults.asModel(con.getStatements(Values.iri(subject.toString()), null, null, array));
    		if(!resultModel.isEmpty()) {
    			result = Optional.of(createActivityPubObject(subject, resultModel));
    		}    
  			addContexts(result.orElse(null), resultModel);
    		copyNamespaces(result.orElse(null), con);    		
    	}    	
    	log.trace("<-readRdfObject("+result.toString()+")");
    	return result;
	}

	private void addContexts(ActivityPubObject result, Model resultModel) {
		Optional<ActivityPubObject> resultOpt = Optional.ofNullable(result); 
		Set<IRI> contexts = StreamSupport.stream(resultModel.getStatements(null, null, null).spliterator(), false)
		.map(stmt->stmt.getContext())
		.map(Resource::stringValue)
		.map(resourceFactory::iri)
		.collect(Collectors.toSet());
 		resultOpt.ifPresent(e->e.addContext(contexts));
	}

	private void copyNamespaces(ActivityPubObject rdfObject, RepositoryConnection con) {
		Optional<ActivityPubObject> rdfObjectOpt = Optional.ofNullable(rdfObject);
		List<Namespace> namespaces = new ArrayList<>();
		con.getNamespaces().forEach(ns->namespaces.add(ns));
		rdfObjectOpt.ifPresent(r->{
			namespaces.forEach(nns->r.addNamespace(nns.getPrefix(), resourceFactory.iri(nns.getName())));
		});
	}

	private ActivityPubObject createActivityPubObject(IRI subject, Model resultModel) {
		return modelToRdfObject.toActivityPubObject(Values.iri(subject.toString()), resultModel);
	}

	@Override
	public void save(ActivityPubObject activityPubObject) {
		saveObjects(List.of(activityPubObject));
	}

	@Override
	public void save(ActivityPubObject activity, ActivityPubObjectRevision rev) {
		saveObjects(rev.getActivityPubObjects());
		save(activity);
	}

	@Override
	public void save(Activity activity, ActivityPubObject object) {
		save(activity);
		save(object);
	}

    public void saveObjects(List<? extends ActivityPubObject> activityPubObjects) {
        try (RepositoryConnection con = repository.getConnection()) {
        	con.begin();
        	try {
				activityPubObjects.stream()
				.sorted((o1, o2)->o1.getVersion().compareTo(o2.getVersion()))
				.forEach(rdfObject -> saveObject(rdfObject, con));
				con.commit();
			} catch (Exception e) {
				con.rollback();
				log.error("error saving objects", e);
				throw e;
			}
        }
    }

	public Optional<ActivityPubObject> findLatestByIdentifierAndTypes(String identifier, Set<IRI> type,
			Set<IRI> contexts) {
		try (RepositoryConnection con = repository.getConnection()) {			
			return findLatestVersion(identifier, type, contexts, con)
					.flatMap(v->findSubject(v, identifier, type, contexts, con))
					.flatMap(id->find(id, contexts));			
		}
	}

	public Set<ActivityPubObject> findLatestByIdentifier(String identifier, Set<IRI> contexts) {
		try (RepositoryConnection con = repository.getConnection()) {
			Optional<BigInteger> latest = findLatestVersion(identifier, Collections.emptySet(), contexts, con);
			
			return latest.map(lv->{
				return findSubjects(lv, identifier, contexts, con).stream()
						.map(subject->find(subject, contexts, con))
						.collect(Collectors.toSet());
				}).orElse(new HashSet<>());
		}
	}

	@Override
	public Optional<ActivityPubObjectRevision> findByIdentifierAndVersion(String identifier, BigInteger version, Set<IRI> contexts) {
		try (RepositoryConnection con = repository.getConnection()) {
			ActivityPubObjectRevision activityPubObjectRevision = activityPubObjectRevisionFactory.create();
			QueryResults.asModel(con.getStatements(null, SCHEMA_ORG.identifier, Values.literal(identifier), toResourceArray(contexts))).stream()
				.map(Statement::getSubject)
				.map(IRI.class::cast)
				.map(sub->find(sub, contexts, con))
				.filter(obj->obj.getVersion().equals(version))
				.forEach(activityPubObjectRevision::add);
			
			return activityPubObjectRevision.isEmpty() ? Optional.empty() : Optional.ofNullable(activityPubObjectRevision);  
		}
	}

	private ActivityPubObject find(IRI iri1, Set<IRI> contexts, RepositoryConnection con) {		
		org.eclipse.rdf4j.model.IRI rdf4jIri = Values.iri(iri1.toString());
		Model m = QueryResults.asModel(con.getStatements(rdf4jIri, null, null, toResourceArray(contexts)));
		return modelToRdfObject.toActivityPubObject(rdf4jIri, m);
	}

	
	private void saveObject(ActivityPubObject rdfObject, RepositoryConnection con) {
    	log.trace("->saveObject");
    	
    	String identifier = rdfObject.getIdentifier();
    	BigInteger version = rdfObject.getVersion();
        
    	Optional<IRI> subjectOpt = findSubject(version, identifier, rdfObject.types(), rdfObject.contexts(), con);
		if (subjectOpt.isPresent()) {
        	throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "object with version " + version + ", identifier: " + identifier + ", types: "+rdfObject.types()+", contexts: "+rdfObject.contexts()+" already exists!");
        }

        findLatestVersion(identifier, rdfObject.types(), rdfObject.contexts(), con).ifPresent(latest->{
        	BigInteger expected = latest.add(BigInteger.valueOf(1));
        	if(expected != version) {
        		String msg = "the latest version of "+rdfObject.getSubject()+" is "+latest+" the next expected version is " + expected + ", but you give me " + version;
        		log.error(msg + "\n" + rdfObject);
        		RepositoryResult<Statement> result = con.getStatements(Values.iri(rdfObject.getSubject().toString()), null, null);
        		ModelLogger.trace(log, QueryResults.asModel(result), "subject in database");
        		throw new IllegalStateException(msg);
        	}
        });
        
        
        Model model = rdfObjectToModel.convert(rdfObject);
        Model filtered = model.filter(Values.iri(rdfObject.getSubject().toString()), null, null);

        log.trace("saving " + filtered.size() + " statements. Identifier: " + identifier + "; version: " + version + ", types: " + rdfObject.types());
        ModelLogger.trace(log, filtered, RDFFormat.TRIGSTAR, "saving ");

        con.add(filtered, toResourceArray(rdfObject.contexts()));
        filtered.getNamespaces().forEach(ns->con.setNamespace(ns.getPrefix(), ns.getName()));
    }

    private org.eclipse.rdf4j.model.Resource[] toResourceArray(Set<IRI> contexts) {
      return contexts.stream()
		.map(Object::toString)
		.map(Values::iri)
		.collect(Collectors.toSet())
		.toArray(org.eclipse.rdf4j.model.IRI[]::new);
    }

    private String[] toStringArray(Set<IRI> contexts) {
        return contexts.stream()
  		.map(Object::toString)
  		.collect(Collectors.toSet())
  		.toArray(String[]::new);
      }

	private Optional<IRI> findSubject(BigInteger version, String identifier, Set<IRI> types, Set<IRI> contexts, RepositoryConnection con) {
    	Set<IRI> subjects = findSubjectInternal(version, identifier, types, contexts, con);
    	if(subjects.size()>1) {
    		throw new IllegalStateException("more than one subject for version: " + version + ", identifier: " + identifier + ", types: " + types);
    	}
    	return subjects.stream().findFirst();
    }
    
	private Set<IRI> findSubjects(BigInteger version, String identifier, Set<IRI> contexts, RepositoryConnection con) {
    	return findSubjectInternal(version, identifier, Collections.emptySet(), contexts, con);
    }

	private Optional<BigInteger> findLatestVersion(String identifier, Set<IRI> types, Set<IRI> contexts, RepositoryConnection con) {
    	Set<BigInteger> versions = findVersions(identifier, types, contexts, con);
    	Optional<BigInteger> max = versions.stream().max(Comparator.naturalOrder());
    	log.trace("findLatestVersion("+identifier+","+types+"): "  + max);    	
		return max;
    }

    private Set<BigInteger> findVersions(String identifier, Set<IRI> types, Set<IRI> contexts, RepositoryConnection con) {
    	List<IRI> ctxList = new ArrayList<IRI>(contexts);
    	String from = "";
    	if(ctxList.size()>0) {
    		from = "";
    		String prefix = "";
    		for (int ctx = 0; ctx < ctxList.size(); ctx++) {
    			from += prefix;
    			from += "FROM <"+ctxList.get(ctx)+">";
    			prefix = "\n";
    		}
    	}
    	
    	Set<BigInteger> versions = new HashSet<>();
    	StringBuilder queryStringBuilder = new StringBuilder();
    	queryStringBuilder.append(
                """
                        PREFIX %s: <%s>
                        PREFIX %s: <%s>
                        SELECT *
                        %s 
                        WHERE {
				""".formatted(	AS.PREFIX, AS.NAMESPACE,
                        		SCHEMA_ORG.PREFIX, SCHEMA_ORG.NAMESPACE, from));
	queryStringBuilder.append(getTypesQueryPart(new ArrayList<>(types)));

	
	queryStringBuilder.append("""
                            ?subject schema:version ?version .
                            ?subject <https://schema.org/identifier> "%s" .
                        } 
                         """.formatted(identifier));
                        
		String queryString = queryStringBuilder.toString();
		log.trace("findVersions(" + identifier + "," + types + "): \n" + queryString);
        TupleQuery tupleQuery = con.prepareTupleQuery(queryString);
        try (TupleQueryResult result = tupleQuery.evaluate()) {
            log.trace("findVersions hasResult: " + result.hasNext());
            while (result.hasNext()) { // iterate over the result
                BindingSet bindingSet = result.next();
                Value version = bindingSet.getValue("version");
                if(version.isLiteral()) {
                	versions.add(((Literal)version).integerValue());
                }
            }
        }
        log.trace("findVersions(" + identifier + "," + types + "): " + versions);
        return versions;
    }

    
    /**
     * Finds subjects in the passed contexts from the passed types matching the passed identifier and version.  
     * @param version The <a href="https://schema.org/version">version</a> that is mandatory for each rdf-pub object. See {@link RdfObjectRevision}
     * @param identifier The <a href="https://schema.org/identifier">identifier</a> that is mandatory for each rdf-pub object. See {@link RdfObjectRevision}
     * @param types The {@link RDF#TYPE}s, that should be queried.
     * @param contexts The <a href="https://rdf4j.org/documentation/programming/repository/#using-named-graphscontext">graphs</a> that contain the created {@link ActivityPubObject}. 
     * @param con {@link RepositoryConnection} to use for searching.
     * @return {@link Set} of {@link IRI} with the subjects for the passed arguments. 
     */
    private Set<IRI> findSubjectInternal(BigInteger version, String identifier, Set<IRI> types, Set<IRI> contexts, RepositoryConnection con) {
    	Set<IRI> subjects = new HashSet<>();
    	String queryString = getFindSubjectQuery(version, identifier, types, contexts);
    	log.trace("findSubjectInternal queryString: " + queryString);
        TupleQuery tupleQuery = con.prepareTupleQuery(queryString);
        try (TupleQueryResult result = tupleQuery.evaluate()) {
            log.trace("findSubject hastResult: " + result.hasNext());
            while (result.hasNext()) { // iterate over the result
                BindingSet bindingSet = result.next();
                Value v = bindingSet.getValue("subject");
                if (!v.isIRI()) {
                    throw new IllegalStateException("no IRI: " + v);
                } else {                	
                	subjects.add(resourceFactory.iri(v.stringValue()));
                }
            }
        }
        log.trace("findSubjectInternal size: " + subjects.size());
        return subjects;
    }

    /**
     * Constructs a 'FindSubjectQuery'.
     * @param version The <a href="https://schema.org/version">version</a> that is mandatory for each rdf-pub object. See {@link RdfObjectRevision}
     * @param identifier The <a href="https://schema.org/identifier">identifier</a> that is mandatory for each rdf-pub object. See {@link RdfObjectRevision}
     * @param types The {@link RDF#TYPE}s, that should be queried.
     * @param contexts The <a href="https://rdf4j.org/documentation/programming/repository/#using-named-graphscontext">graphs</a> that contain the created {@link ActivityPubObject}. 
     * @return The 'FindSubjectQuery' as String.
     */
	private String getFindSubjectQuery(BigInteger version, String identifier, Set<IRI> types, Set<IRI> contexts) {
		List<IRI> ctxList = new ArrayList<IRI>(contexts);
    	String from = "";
    	if(ctxList.size()>0) {
    		from = "";
    		String prefix = "";
    		for (int ctx = 0; ctx < ctxList.size(); ctx++) {
    			from += prefix;
    			from += "FROM  <"+ctxList.get(ctx)+">";
    			prefix = "\n";
    		}
    	}

		StringBuilder queryStringBuilder = new StringBuilder();
		queryStringBuilder.append(Prefix.OWL.value()).append("\n");
		queryStringBuilder.append(Prefix.XML.value()).append("\n");
		queryStringBuilder.append(Prefix.XSD.value()).append("\n");
		queryStringBuilder.append(Prefix.RDFS.value()).append("\n");
		queryStringBuilder.append(Prefix.AS.value()).append("\n");
		queryStringBuilder.append(Prefix.RDF.value()).append("\n");
		queryStringBuilder.append(Prefix.SCHEMA.value()).append("\n");
		queryStringBuilder.append("SELECT * ").append("\n");
		queryStringBuilder.append(from).append("\n");
		queryStringBuilder.append("WHERE {").append("\n");
    	
		if(!types.isEmpty()) {
			queryStringBuilder.append(getTypesQueryPart(new ArrayList<>(types)));
		}
    	
    	if(version!=null) {    	
        	queryStringBuilder.append("    ?subject schema:version %s .\n".formatted(version));
		}

        queryStringBuilder.append("    ?subject schema:identifier \"%s\" .".formatted(identifier));
        queryStringBuilder.append("\n}");
        
        String queryString = queryStringBuilder.toString();
        log.trace("getFindSubjectQuery: \n" + queryString);		
		return queryString;
	}

    private String getTypesQueryPart(List<IRI> typesList) {
    	StringBuilder queryStringBuilder = new StringBuilder();
        	for (int i = 0; i < typesList.size(); i++) {
			IRI type = typesList.get(i);
			if(i==0) {
				queryStringBuilder.append("    ?subject rdf:type <%s> ".formatted(type.toString()));
			} else {
				queryStringBuilder.append("             rdf:type <%s> ".formatted(type.toString()));
			}
			if (i==typesList.size()-1) {
				queryStringBuilder.append(".\n");
			} else {
				queryStringBuilder.append(";\n");
			}
		}
    	return queryStringBuilder.toString();
    }

	@Override
	public String dump(Set<IRI> contexts) {
		try (RepositoryConnection con = repository.getConnection()) {
			Model all = QueryResults.asModel(con.getStatements(null, null, null, toResourceArray(contexts)));
			return ModelLogger.toString(all, RDFFormat.TRIGSTAR);
		}
	}

	public String dump() {
		try (RepositoryConnection con = repository.getConnection()) {
			Model all = QueryResults.asModel(con.getStatements(null, null, null));
			return ModelLogger.toString(all, RDFFormat.TRIGSTAR);
		}
	}

	@Override
	public OutputStream executeSparql(Set<IRI> contexts, String sparqlQuery, String headerAccept) throws IOException {
		return sparqlExecutor.executeSparql(toStringArray(contexts), sparqlQuery, headerAccept);
	}
}