Skip to content

Commit

Permalink
WIP: reuters poller business logic
Browse files Browse the repository at this point in the history
  • Loading branch information
bryophyta committed Dec 23, 2024
1 parent 296c728 commit c983801
Show file tree
Hide file tree
Showing 2 changed files with 269 additions and 7 deletions.
36 changes: 36 additions & 0 deletions poller-lambdas/src/pollers/reuters/auth.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { z } from 'zod';

const AuthSchema = z.object({
access_token: z.string(),
expires_in: z.number(),
token_type: z.string(),
});

type AuthData = z.infer<typeof AuthSchema>;

const scopes =
'https://api.thomsonreuters.com/auth/reutersconnect.contentapi.read https://api.thomsonreuters.com/auth/reutersconnect.contentapi.write';
const authUrl = 'https://auth.thomsonreuters.com/oauth/token';
const grantType = 'client_credentials';
const audience = '7a14b6a2-73b8-4ab2-a610-80fb9f40f769';

export async function auth(
clientId: string,
clientSecret: string,
): Promise<AuthData> {
const req = new Request(authUrl, {
method: 'POST',
headers: {
'content-type': 'application/x-www-form-urlencoded',
},
body: `grant_type=${grantType}&client_id=${clientId}&client_secret=${clientSecret}&audience=${audience}&scope=${encodeURIComponent(scopes)}`,
});
try {
const response = await fetch(req);
const data = (await response.json()) as unknown;
return AuthSchema.parse(data);
} catch (error) {
console.error(error);
throw new Error('Failed to get auth token from Reuters');
}
}
240 changes: 233 additions & 7 deletions poller-lambdas/src/pollers/reuters/reutersPoller.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,244 @@
import { parse } from 'node-html-parser';
import { z } from 'zod';
import type { IngestorInputBody } from '../../../../shared/types';
import type {
FixedFrequencyPollFunction,
PollerInput,
SecretValue,
} from '../../types';
import { auth } from './auth';

/**
* usn: unique story number. this is the same identifier as the guid
* version: The version of the news object which is identified by the uri property
* versionProcessed: timestamp when this version was processed in our internal systems
* versionedGuid: The globally unique identifier of the target item (guid) which also includes the version identifier
*/
const textItemsSearchQuery = `query NewTextItemsSearch {
search(filter: { mediaTypes: TEXT }) {
pageInfo {
endCursor
hasNextPage
}
items {
caption
headLine
uri
usn
versionedGuid
}
}
}`;

const NullishToStringOrUndefinedSchema = z
.string()
.nullish()
.transform((x) => x ?? undefined);

const SearchDataSchema = z.object({
data: z.object({
search: z.object({
items: z.array(
z.object({
caption: NullishToStringOrUndefinedSchema,
headLine: NullishToStringOrUndefinedSchema,
uri: NullishToStringOrUndefinedSchema,
usn: NullishToStringOrUndefinedSchema,
versionedGuid: NullishToStringOrUndefinedSchema,
}),
),
}),
}),
});

/**
* PF: As far as I can tell this should include basically everything that's in the NewsML download,
* so we shouldn't need to fetch that as well.
*/
function itemQuery(itemId: string) {
return `query ItemDetailQuery {
item(id: "${itemId}") {
byLine
copyrightNotice
versionCreated
fragment
headLine
versionedGuid
uri
language
type
profile
slug
usageTerms
usageTermsRole
version
credit
firstCreated
productLabel
pubStatus
urgency
usn
position
intro
bodyXhtml
bodyXhtmlRich
subject {
code
name
rel
}
}}`;
}

const ReutersItemSchema = z.object({
byLine: NullishToStringOrUndefinedSchema,
copyrightNotice: NullishToStringOrUndefinedSchema,
versionCreated: NullishToStringOrUndefinedSchema,
fragment: NullishToStringOrUndefinedSchema,
headLine: NullishToStringOrUndefinedSchema,
versionedGuid: z.string(),
uri: NullishToStringOrUndefinedSchema,
language: NullishToStringOrUndefinedSchema,
type: NullishToStringOrUndefinedSchema,
profile: NullishToStringOrUndefinedSchema,
slug: NullishToStringOrUndefinedSchema,
usageTerms: NullishToStringOrUndefinedSchema,
usageTermsRole: NullishToStringOrUndefinedSchema,
version: NullishToStringOrUndefinedSchema,
credit: NullishToStringOrUndefinedSchema,
firstCreated: NullishToStringOrUndefinedSchema,
productLabel: NullishToStringOrUndefinedSchema,
pubStatus: NullishToStringOrUndefinedSchema,
urgency: z
.number()
.nullish()
.transform((x) => x ?? undefined),
usn: NullishToStringOrUndefinedSchema,
position: NullishToStringOrUndefinedSchema,
bodyXhtml: NullishToStringOrUndefinedSchema,
bodyXhtmlRich: NullishToStringOrUndefinedSchema,
subject: z.array(
z.object({
code: NullishToStringOrUndefinedSchema,
name: NullishToStringOrUndefinedSchema,
rel: NullishToStringOrUndefinedSchema,
}),
),
});

const itemResponseSchema = z.object({
data: z.object({
item: ReutersItemSchema,
}),
});

function itemResponseToIngestionLambdaInput(
item: z.infer<typeof ReutersItemSchema>,
): IngestorInputBody {
const { bodyXhtmlRich, bodyXhtml } = item;
const bodyHtml = parse(bodyXhtmlRich ?? bodyXhtml ?? '').querySelector(
'body',
)?.innerHTML;
return {
originalContentText: item.bodyXhtmlRich,
uri: item.uri,
'source-feed': 'Reuters-Newswires',
usn: item.usn,
version: item.version,
type: item.type,
format: 'TODO',
mimeType: 'TODO',
firstVersion: item.firstCreated,
versionCreated: item.versionCreated,
dateTimeSent: item.versionCreated,
originalUrn: item.versionedGuid,
slug: item.slug,
headline: item.headLine,
byline: item.byLine,
priority: item.urgency?.toString() ?? '',
subjects: {
code: item.subject
.map((subject) => subject.code)
.filter((_): _ is string => _ !== undefined),
},
mediaCatCodes: '',
keywords: [],
organisation: { symbols: [] },
tabVtxt: 'X',
status: item.pubStatus,
usage: item.usageTerms,
ednote: '',
abstract: item.fragment,
body_text: bodyHtml,
copyrightNotice: item.copyrightNotice,
language: item.language,
};
}

const SecretValueSchema = z.object({
CLIENT_ID: z.string(),
CLIENT_SECRET: z.string(),
});

export const reutersPoller = (async (
_secret: SecretValue,
_input: PollerInput,
secret: SecretValue,
input: PollerInput,
) => {
console.log('Reuters poller running');
await new Promise((resolve) => setTimeout(resolve, 1000));
const parsedSecret = SecretValueSchema.safeParse(JSON.parse(secret));
if (!parsedSecret.success) {
throw new Error('Failed to parse secret value for Reuters poller');
}
const { CLIENT_ID, CLIENT_SECRET } = parsedSecret.data;
const { access_token } = await auth(
CLIENT_ID,
CLIENT_SECRET,
); /** @todo: the tokens are quite long-lived so we should check that there aren't any problems requesting one on each invocation. */

const searchResponse = await fetch(
'https://api.reutersconnect.com/content/graphql',
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${access_token}`,
},
body: JSON.stringify({
query: textItemsSearchQuery,
}),
},
);

const searchData = SearchDataSchema.parse(await searchResponse.json());

const itemsToFetch = searchData.data.search.items
.map((item) => item.versionedGuid)
.filter((guid): guid is string => guid !== undefined);

const itemResponses = await Promise.all(
itemsToFetch.map(async (itemId) => {
const itemResponse = await fetch(
'https://api.reutersconnect.com/content/graphql',
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${access_token}`,
},
body: JSON.stringify({
query: itemQuery(itemId),
}),
},
);
return itemResponseSchema.parse(await itemResponse.json());
}),
);

return {
payloadForIngestionLambda: [],
valueForNextPoll: '',
idealFrequencyInSeconds: 30,
payloadForIngestionLambda: itemResponses.map((response) => ({
externalId: response.data.item.versionedGuid,
body: itemResponseToIngestionLambdaInput(response.data.item),
})),
valueForNextPoll: input,
idealFrequencyInSeconds: 60,
};
}) satisfies FixedFrequencyPollFunction;

0 comments on commit c983801

Please sign in to comment.