Skip to content

Commit

Permalink
EventsQuery by protocol (#762)
Browse files Browse the repository at this point in the history
This PR is to implement a `protocol` filter for the `EventsQuery`
interface which accounts for receiving permission messages scoped to the
filtered protocol via tag filters, as well as other messages directly
indexed to the protocol.

It also resolves this issue
#663, which simplifies
the type of filters `EventsQuery` implements to reduce complexity of
which messages may or may not be returned with the filter.

Since `EventsQuery` is being used for sync/selective sync, the
`protocol` filter is most important, if further filters are needed down
the line, we will implement them according to scenarios put in place so
we can make sure the correct messages arrive with the query results.

The Additional filters I left are for `interface`, `method` and
`messageTimestamp` which are compatible with all types of messages.

NOTE: I cleaned up something I didn't like with `EventsQuery` from the
prior PR instead of the message descriptor having an undefined filter,
it is forced to have an empty filters array.

NOTE2: `EventsSubscribe` does not yet handle filtering for protocols
which include the permissions messages, that will be done in a separate
PR.
  • Loading branch information
LiranCohen authored Jun 21, 2024
1 parent 4866891 commit 92b6792
Show file tree
Hide file tree
Showing 13 changed files with 347 additions and 1,617 deletions.
77 changes: 1 addition & 76 deletions json-schemas/interface-methods/events-filter.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,7 @@
"protocol": {
"type": "string"
},
"protocolPath": {
"type": "string"
},
"recipient": {
"$ref": "https://identity.foundation/dwn/json-schemas/defs.json#/$defs/did"
},
"contextId": {
"type": "string"
},
"schema": {
"type": "string"
},
"recordId": {
"type": "string"
},
"parentId": {
"type": "string"
},
"dataFormat": {
"type": "string"
},
"dataSize": {
"$ref": "https://identity.foundation/dwn/json-schemas/number-range-filter.json"
},
"dateCreated": {
"messageTimestamp": {
"type": "object",
"minProperties": 1,
"additionalProperties": false,
Expand All @@ -59,57 +35,6 @@
"$ref": "https://identity.foundation/dwn/json-schemas/defs.json#/$defs/date-time"
}
}
},
"datePublished": {
"type": "object",
"minProperties": 1,
"additionalProperties": false,
"properties": {
"from": {
"$ref": "https://identity.foundation/dwn/json-schemas/defs.json#/$defs/date-time"
},
"to": {
"$ref": "https://identity.foundation/dwn/json-schemas/defs.json#/$defs/date-time"
}
}
},
"dateUpdated": {
"type": "object",
"minProperties": 1,
"additionalProperties": false,
"properties": {
"from": {
"$ref": "https://identity.foundation/dwn/json-schemas/defs.json#/$defs/date-time"
},
"to": {
"$ref": "https://identity.foundation/dwn/json-schemas/defs.json#/$defs/date-time"
}
}
}
},
"dependencies": {
"datePublished": {
"oneOf": [
{
"properties": {
"published": {
"enum": [
true
]
}
},
"required": [
"published"
]
},
{
"not": {
"required": [
"published"
]
}
}
]
}
}
}
4 changes: 2 additions & 2 deletions json-schemas/interface-methods/events-query.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"required": [
"interface",
"method",
"messageTimestamp"
"messageTimestamp",
"filters"
],
"properties": {
"interface": {
Expand All @@ -37,7 +38,6 @@
},
"filters": {
"type": "array",
"minItems": 1,
"items": {
"$ref": "https://identity.foundation/dwn/json-schemas/events-filter.json"
}
Expand Down
6 changes: 2 additions & 4 deletions src/handlers/events-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ export class EventsQueryHandler implements MethodHandler {
return messageReplyFromError(e, 401);
}

// if no filter is present in the the `EventsQuery` descriptor, we pass an empty array of filters to the `queryEvents` method
// this will return all events in the event log for the given tenant beyond the cursor provided.
// if no cursor is provided, it will return all events
const eventFilters = message.descriptor.filters ? Events.convertFilters(message.descriptor.filters) : [];
// an empty array of filters means no filtering and all events are returned
const eventFilters = Events.convertFilters(message.descriptor.filters);
const { events, cursor } = await this.eventLog.queryEvents(tenant, eventFilters, message.descriptor.cursor);

return {
Expand Down
9 changes: 3 additions & 6 deletions src/interfaces/events-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import { Events } from '../utils/events.js';
import { Message } from '../core/message.js';
import { removeUndefinedProperties } from '../utils/object.js';
import { Time } from '../utils/time.js';
import { validateProtocolUrlNormalized } from '../utils/url.js';
import { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js';
import { validateProtocolUrlNormalized, validateSchemaUrlNormalized } from '../utils/url.js';

export type EventsQueryOptions = {
signer: Signer;
Expand All @@ -23,13 +23,10 @@ export class EventsQuery extends AbstractMessage<EventsQueryMessage>{
Message.validateJsonSchema(message);
await Message.validateSignatureStructure(message.authorization.signature, message.descriptor);

for (const filter of message.descriptor.filters || []) {
for (const filter of message.descriptor.filters) {
if ('protocol' in filter && filter.protocol !== undefined) {
validateProtocolUrlNormalized(filter.protocol);
}
if ('schema' in filter && filter.schema !== undefined) {
validateSchemaUrlNormalized(filter.schema);
}
}

return new EventsQuery(message);
Expand All @@ -39,7 +36,7 @@ export class EventsQuery extends AbstractMessage<EventsQueryMessage>{
const descriptor: EventsQueryDescriptor = {
interface : DwnInterfaceName.Events,
method : DwnMethodName.Query,
filters : options.filters ? Events.normalizeFilters(options.filters) : undefined,
filters : options.filters ? Events.normalizeFilters(options.filters) : [],
messageTimestamp : options.messageTimestamp ?? Time.getCurrentTimestamp(),
cursor : options.cursor,
};
Expand Down
5 changes: 1 addition & 4 deletions src/interfaces/events-subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { AbstractMessage } from '../core/abstract-message.js';
import { Message } from '../core/message.js';
import { removeUndefinedProperties } from '../utils/object.js';
import { Time } from '../utils/time.js';
import { validateProtocolUrlNormalized } from '../utils/url.js';
import { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js';
import { validateProtocolUrlNormalized, validateSchemaUrlNormalized } from '../utils/url.js';


export type EventsSubscribeOptions = {
Expand All @@ -24,9 +24,6 @@ export class EventsSubscribe extends AbstractMessage<EventsSubscribeMessage> {
if ('protocol' in filter && filter.protocol !== undefined) {
validateProtocolUrlNormalized(filter.protocol);
}
if ('schema' in filter && filter.schema !== undefined) {
validateSchemaUrlNormalized(filter.schema);
}
}

Time.validateTimestamp(message.descriptor.messageTimestamp);
Expand Down
30 changes: 4 additions & 26 deletions src/types/events-types.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,17 @@
import type { MessageEvent } from './subscriptions.js';
import type { AuthorizationModel, GenericMessage, GenericMessageReply, MessageSubscription } from './message-types.js';
import type { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js';
import type { PaginationCursor, RangeCriterion, RangeFilter } from './query-types.js';
import type { PaginationCursor, RangeCriterion } from './query-types.js';
/**
* filters used when filtering for any type of Message across interfaces
*/
export type EventsMessageFilter = {
export type EventsFilter = {
interface?: string;
method?: string;
dateUpdated?: RangeCriterion;
};

/**
* We only allow filtering for events by immutable properties, the omitted properties could be different per subsequent writes.
*/
export type EventsRecordsFilter = {
recipient?: string;
protocol?: string;
protocolPath?: string;
contextId?: string;
schema?: string;
recordId?: string;
parentId?: string;
dataFormat?: string;
dataSize?: RangeFilter;
dateCreated?: RangeCriterion;
messageTimestamp?: RangeCriterion;
};


/**
* A union type of the different types of filters a user can use when issuing an EventsQuery or EventsSubscribe
* TODO: simplify the EventsFilters to only the necessary in order to reduce complexity https://github.com/TBD54566975/dwn-sdk-js/issues/663
*/
export type EventsFilter = EventsMessageFilter | EventsRecordsFilter;

export type MessageSubscriptionHandler = (event: MessageEvent) => void;

export type EventsSubscribeMessageOptions = {
Expand All @@ -60,7 +38,7 @@ export type EventsQueryDescriptor = {
interface: DwnInterfaceName.Events;
method: DwnMethodName.Query;
messageTimestamp: string;
filters?: EventsFilter[];
filters: EventsFilter[];
cursor?: PaginationCursor;
};

Expand Down
78 changes: 46 additions & 32 deletions src/utils/events.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import type { EventsFilter } from '../types/events-types.js';
import type { Filter } from '../types/query-types.js';
import type { EventsFilter, EventsMessageFilter, EventsRecordsFilter } from '../types/events-types.js';

import { FilterUtility } from '../utils/filter.js';
import { normalizeProtocolUrl } from './url.js';
import { PermissionsProtocol } from '../protocols/permissions.js';
import { Records } from '../utils/records.js';
import { isEmptyObject, removeUndefinedProperties } from './object.js';

Expand All @@ -17,15 +19,15 @@ export class Events {

const eventsQueryFilters: EventsFilter[] = [];

// normalize each filter individually by the type of filter it is.
// normalize each filter, and only add non-empty filters to the returned array
for (const filter of filters) {
let eventsFilter: EventsFilter;
if (this.isRecordsFilter(filter)) {
eventsFilter = Records.normalizeFilter(filter);
} else {
// no normalization needed
eventsFilter = filter;
}
// normalize the protocol URL if it exists
const protocol = filter.protocol !== undefined ? normalizeProtocolUrl(filter.protocol) : undefined;

const eventsFilter = {
...filter,
protocol,
};

// remove any empty filter properties and do not add if empty
removeUndefinedProperties(eventsFilter);
Expand All @@ -34,7 +36,6 @@ export class Events {
}
}


return eventsQueryFilters;
}

Expand All @@ -53,43 +54,56 @@ export class Events {
// first we check for `EventsRecordsFilter` fields for conversion
// otherwise it is `EventsMessageFilter` fields for conversion
for (const filter of filters) {
if (this.isRecordsFilter(filter)) {
eventsQueryFilters.push(Records.convertFilter(filter));
} else {
eventsQueryFilters.push(this.convertFilter(filter));
// extract the protocol tag filter from the incoming event record filter
// this filters for permission grants, requests and revocations associated with a targeted protocol
// since permissions are their own protocol, we added an additional tag index when writing the permission messages
// so that we can filter for permission records here
const permissionRecordsFilter = this.constructPermissionRecordsFilter(filter);
if (permissionRecordsFilter) {
eventsQueryFilters.push(permissionRecordsFilter);
}

eventsQueryFilters.push(this.convertFilter(filter));
}

return eventsQueryFilters;
}

/**
* Constructs a filter that gets associated permission records if protocol is in the given filter.
*/
private static constructPermissionRecordsFilter(filter: EventsFilter): Filter | undefined {
const { protocol, messageTimestamp } = filter;
if (protocol !== undefined) {
const taggedFilter = {
protocol: PermissionsProtocol.uri,
...Records.convertTagsFilter({ protocol })
} as Filter;

if (messageTimestamp != undefined) {
// if we filter by message timestamp, we also want to filter the permission messages by the same timestamp range
const messageTimestampFilter = FilterUtility.convertRangeCriterion(messageTimestamp);
if (messageTimestampFilter) {
taggedFilter.messageTimestamp = messageTimestampFilter;
}
}

return taggedFilter;
}
}

/**
* Converts an external-facing filter model into an internal-facing filer model used by data store.
*/
private static convertFilter(filter: EventsMessageFilter): Filter {
private static convertFilter(filter: EventsFilter): Filter {
const filterCopy = { ...filter } as Filter;

const { dateUpdated } = filter;
const messageTimestampFilter = dateUpdated ? FilterUtility.convertRangeCriterion(dateUpdated) : undefined;
const { messageTimestamp } = filter;
const messageTimestampFilter = messageTimestamp ? FilterUtility.convertRangeCriterion(messageTimestamp) : undefined;
if (messageTimestampFilter) {
filterCopy.messageTimestamp = messageTimestampFilter;
delete filterCopy.dateUpdated;
}
return filterCopy as Filter;
}

// we deliberately do not check for `dateUpdated` in this filter.
// if it were the only property that matched, it could be handled by `EventsFilter`
private static isRecordsFilter(filter: EventsFilter): filter is EventsRecordsFilter {
return 'author' in filter ||
'dateCreated' in filter ||
'dataFormat' in filter ||
'dataSize' in filter ||
'parentId' in filter ||
'recordId' in filter ||
'schema' in filter ||
'protocol' in filter ||
'protocolPath' in filter ||
'recipient' in filter;
}
}
2 changes: 1 addition & 1 deletion src/utils/records.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ export class Records {
/**
* This will create individual keys for each of the tag filters that look like `tag.tag_filter_property`
*/
private static convertTagsFilter( tags: { [property: string]: RecordsWriteTagsFilter}): Filter {
public static convertTagsFilter( tags: { [property: string]: RecordsWriteTagsFilter}): Filter {
const tagValues:Filter = {};
for (const property in tags) {
const value = tags[property];
Expand Down
Loading

0 comments on commit 92b6792

Please sign in to comment.