Skip to content

Commit

Permalink
feat: redrive piece accept
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Shaw authored and alanshaw committed Oct 2, 2024
1 parent caae822 commit b093f2b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 50 deletions.
3 changes: 2 additions & 1 deletion tools/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"sade": "^1.8.1"
},
"dependencies": {
"archy": "^1.0.0"
"archy": "^1.0.0",
"p-retry": "^6.2.0"
}
}
108 changes: 59 additions & 49 deletions tools/redrive-piece-accept.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { Aggregator } from '@web3-storage/filecoin-client'
import { QueryCommand } from '@aws-sdk/client-dynamodb'
import { Piece } from '@web3-storage/data-segment'
import map from 'p-map'
import retry, { AbortError } from 'p-retry'

Check failure on line 20 in tools/redrive-piece-accept.js

View workflow job for this annotation

GitHub Actions / Test

'AbortError' is declared but its value is never read.
import { getServiceSigner } from '../filecoin/service.js'
import { createPieceTable } from '../filecoin/store/piece.js'
import { createReceiptStore } from '../filecoin/store/receipt.js'
Expand All @@ -27,7 +28,7 @@ import { unmarshall } from '@aws-sdk/util-dynamodb'

/** @typedef {import('@web3-storage/upload-api').PieceLink} PieceLink */

const CONCURRENCY = 10
const CONCURRENCY = 50

export const redrivePieceAccept = async () => {
const stage = getStage()
Expand All @@ -50,15 +51,18 @@ export const redrivePieceAccept = async () => {

let total = 0
// query submitted status pieces (they are orderd by oldest timestamp with sort key)
for await (const submittedPieces of pieceStore.query({ status: 'submitted' })) {
/** @type {string|undefined} */
let cursor
do {
const submittedPieces = await pieceStore.query({ status: 'submitted' }, { cursor })
if (submittedPieces.error) {
throw new Error('failed to get submitted pieces', { cause: submittedPieces.error })
}

await map(submittedPieces.ok.entries(), async ([i, record]) => {
await map(submittedPieces.ok.results.entries(), async ([i, record]) => {
total++
const pfx = `${record.piece}:`
console.log(`${pfx} ${record.insertedAt} (${(i + 1).toLocaleString()} of ${submittedPieces.ok.length.toLocaleString()} total: ${total})`)
console.log(`${pfx} ${record.insertedAt} (${(i + 1).toLocaleString()} of ${submittedPieces.ok.results.length.toLocaleString()} total: ${total})`)

const filecoinSubmitInvocation = await StorefrontCaps.filecoinSubmit
.invoke({
Expand All @@ -74,59 +78,65 @@ export const redrivePieceAccept = async () => {
.delegate()
console.log(`${pfx} filecoin/submit: ${filecoinSubmitInvocation.link()}`)

const filecoinSubmitReceiptGet = await receiptStore.get(filecoinSubmitInvocation.link())
if (filecoinSubmitReceiptGet.error) {
return console.error(`${pfx} missing filecoin/submit receipt`)
}
if (!filecoinSubmitReceiptGet.ok.fx.join) {
return console.error(`${pfx} missing filecoin/submit next task`)
}
console.log(`${pfx} piece/offer: ${filecoinSubmitReceiptGet.ok.fx.join}`)
await retry(async () => {
const filecoinSubmitReceiptGet = await receiptStore.get(filecoinSubmitInvocation.link())
if (filecoinSubmitReceiptGet.error) {
return console.error(`${pfx} missing filecoin/submit receipt`)
}
if (!filecoinSubmitReceiptGet.ok.fx.join) {
return console.error(`${pfx} missing filecoin/submit next task`)
}
console.log(`${pfx} piece/offer: ${filecoinSubmitReceiptGet.ok.fx.join}`)

const pieceOfferReceiptGet = await receiptStore.get(filecoinSubmitReceiptGet.ok.fx.join.link())
if (pieceOfferReceiptGet.error) {
return console.error(`${pfx} missing piece/offer receipt`)
}
if (!pieceOfferReceiptGet.ok.fx.join) {
return console.error(`${pfx} missing piece/offer next task`)
}
console.log(`${pfx} piece/accept: ${pieceOfferReceiptGet.ok.fx.join}`)
const pieceOfferReceiptGet = await receiptStore.get(filecoinSubmitReceiptGet.ok.fx.join.link())
if (pieceOfferReceiptGet.error) {
return console.error(`${pfx} missing piece/offer receipt`)
}
if (!pieceOfferReceiptGet.ok.fx.join) {
return console.error(`${pfx} missing piece/offer next task`)
}
console.log(`${pfx} piece/accept: ${pieceOfferReceiptGet.ok.fx.join}`)

const pieceAcceptReceiptGet = await receiptStore.get(pieceOfferReceiptGet.ok.fx.join.link())
if (pieceAcceptReceiptGet.ok) {
console.log(`${pfx} receipt exists - skipping`)
return
}
const pieceAcceptReceiptGet = await receiptStore.get(pieceOfferReceiptGet.ok.fx.join.link())
if (pieceAcceptReceiptGet.ok) {
console.log(`${pfx} receipt exists - skipping`)
return
}

const inclusionList = await inclusionStore.list(record.piece)
if (inclusionList.error) {
throw new Error(`failed to list inclusions ${record.piece}`, { cause: inclusionList.error })
}
if (!inclusionList.ok.length) {
console.warn(`${pfx} no inclusion for piece: ${record.piece}`)
return
}
const inclusionList = await inclusionStore.list(record.piece)
if (inclusionList.error) {
throw new Error(`failed to list inclusions ${record.piece}`, { cause: inclusionList.error })
}
if (!inclusionList.ok.length) {
console.warn(`${pfx} no inclusion for piece: ${record.piece}`)
return
}

for (const inclusion of inclusionList.ok) {
console.log(`${pfx} aggregate: ${inclusion.aggregate} group: ${inclusion.group}`)
for (const inclusion of inclusionList.ok) {
console.log(`${pfx} aggregate: ${inclusion.aggregate} group: ${inclusion.group}`)

const pieceAcceptInvocation = await Aggregator.pieceAccept({
issuer: aggregatorServiceSigner,
with: aggregatorServiceSigner.did()
}, record.piece, inclusion.group)
if (pieceAcceptInvocation.out.error) {
throw new Error(`failed piece/accept invocation: ${record.piece}`, { cause: pieceAcceptInvocation.out.error })
}
const pieceAcceptInvocation = await Aggregator.pieceAccept({
issuer: aggregatorServiceSigner,
with: aggregatorServiceSigner.did()
}, record.piece, inclusion.group)
if (pieceAcceptInvocation.out.error) {
throw new Error(`failed piece/accept invocation: ${record.piece}`, { cause: pieceAcceptInvocation.out.error })
}

const pieceAcceptReceiptGet = await receiptStore.get(pieceOfferReceiptGet.ok.fx.join.link())
if (!pieceAcceptReceiptGet.ok) {
throw new Error(`receipt does not exist even after piece accept invocation: ${record.piece}`)
}
const pieceAcceptReceiptGet = await receiptStore.get(pieceOfferReceiptGet.ok.fx.join.link())
if (!pieceAcceptReceiptGet.ok) {
throw new Error(`receipt does not exist even after piece accept invocation: ${record.piece}`)
}

console.log(`${pfx} ✅ piece/accept receipt issued`)
}
console.log(`${pfx} ✅ piece/accept receipt issued`)
}
}, { onFailedAttempt: err => console.warn(pfx, err) })
}, { concurrency: CONCURRENCY })
}

cursor = submittedPieces.ok.cursor
} while (cursor)

console.log('Done!')
}

const getEnv = () => ({
Expand Down

0 comments on commit b093f2b

Please sign in to comment.