Skip to content

Commit dbb780c

Browse files
committed
feat: add GetJobRuns to Job Distributor
1 parent 33c87e5 commit dbb780c

File tree

24 files changed

+656
-63
lines changed

24 files changed

+656
-63
lines changed

.changeset/lemon-keys-sniff.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"chainlink": minor
3+
---
4+
5+
#added GetJobRuns to Job Distributor

core/scripts/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ require (
482482
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20250717121125-2350c82883e2 // indirect
483483
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250729142306-508e798f6a5d // indirect
484484
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976 // indirect
485-
github.com/smartcontractkit/chainlink-protos/orchestrator v0.8.1 // indirect
485+
github.com/smartcontractkit/chainlink-protos/orchestrator v0.10.0 // indirect
486486
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect
487487
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 // indirect
488488
github.com/smartcontractkit/chainlink-protos/svr v1.1.0 // indirect

core/scripts/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1568,8 +1568,8 @@ github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250829155125-f4655b
15681568
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250829155125-f4655b0b4605/go.mod h1:jUC52kZzEnWF9tddHh85zolKybmLpbQ1oNA4FjOHt1Q=
15691569
github.com/smartcontractkit/chainlink-protos/job-distributor v0.13.1 h1:PWwLGimBt37eDzpbfZ9V/ZkW4oCjcwKjKiAwKlSfPc0=
15701570
github.com/smartcontractkit/chainlink-protos/job-distributor v0.13.1/go.mod h1:/dVVLXrsp+V0AbcYGJo3XMzKg3CkELsweA/TTopCsKE=
1571-
github.com/smartcontractkit/chainlink-protos/orchestrator v0.8.1 h1:VcFo27MBPTMB1d1Tp3q3RzJNqwErKR+z9QLQZ6KBSXo=
1572-
github.com/smartcontractkit/chainlink-protos/orchestrator v0.8.1/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo=
1571+
github.com/smartcontractkit/chainlink-protos/orchestrator v0.10.0 h1:0eroOyBwmdoGUwUdvMI0/J7m5wuzNnJDMglSOK1sfNY=
1572+
github.com/smartcontractkit/chainlink-protos/orchestrator v0.10.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo=
15731573
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 h1:L6KJ4kGv/yNNoCk8affk7Y1vAY0qglPMXC/hevV/IsA=
15741574
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6/go.mod h1:FRwzI3hGj4CJclNS733gfcffmqQ62ONCkbGi49s658w=
15751575
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+CMJ26elVw/cAJqqhBQ3Xa/mBYWK0/rQ5MuI=

core/services/feeds/mocks/orm.go

Lines changed: 58 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/services/feeds/mocks/service.go

Lines changed: 61 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/services/feeds/models_test.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@ func Test_NewChainType(t *testing.T) {
4747
}
4848

4949
for _, tt := range tests {
50-
tt := tt
51-
5250
t.Run(tt.name, func(t *testing.T) {
5351
ct, err := NewChainType(tt.give)
5452

@@ -180,8 +178,6 @@ func Test_OCR1Config_Value(t *testing.T) {
180178
}
181179

182180
for _, tt := range tests {
183-
tt := tt
184-
185181
t.Run(tt.name, func(t *testing.T) {
186182
val, err := tt.give.Value()
187183
require.NoError(t, err)
@@ -244,8 +240,6 @@ func Test_OCR1Config_Scan(t *testing.T) {
244240
}
245241

246242
for _, tt := range tests {
247-
tt := tt
248-
249243
t.Run(tt.name, func(t *testing.T) {
250244
var actual OCR1Config
251245
err := actual.Scan([]byte(tt.give))
@@ -391,7 +385,6 @@ func Test_JobProposal_CanEditDefinition(t *testing.T) {
391385
}
392386

393387
for _, tc := range tests {
394-
tc := tc
395388
t.Run(tc.name, func(t *testing.T) {
396389
t.Parallel()
397390

core/services/feeds/orm.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type ORM interface {
5757
UpdateSpecDefinition(ctx context.Context, id int64, spec string) error
5858

5959
IsJobManaged(ctx context.Context, jobID int64) (bool, error)
60+
IsJobManagedByFeedsManager(ctx context.Context, jobID int64, feedsManagerID int64) (bool, error)
6061

6162
Transact(context.Context, func(ORM) error) error
6263
WithDataSource(sqlutil.DataSource) ORM
@@ -387,7 +388,7 @@ func (o *orm) CountJobProposals(ctx context.Context) (count int64, err error) {
387388
// CountJobProposals counts the number of job proposal records.
388389
func (o *orm) CountJobProposalsByStatus(ctx context.Context) (counts *JobProposalCounts, err error) {
389390
stmt := `
390-
SELECT
391+
SELECT
391392
COUNT(*) filter (where job_proposals.status = 'pending' OR job_proposals.pending_update = TRUE) as pending,
392393
COUNT(*) filter (where job_proposals.status = 'approved' AND job_proposals.pending_update = FALSE) as approved,
393394
COUNT(*) filter (where job_proposals.status = 'rejected' AND job_proposals.pending_update = FALSE) as rejected,
@@ -878,3 +879,20 @@ SELECT exists (
878879
err = o.ds.GetContext(ctx, &exists, stmt, jobID)
879880
return exists, errors.Wrap(err, "IsJobManaged failed")
880881
}
882+
883+
// IsJobManagedByFeedsManager determines if a job is managed by a specific feeds manager.
884+
func (o *orm) IsJobManagedByFeedsManager(ctx context.Context, jobID int64, feedsManagerID int64) (exists bool, err error) {
885+
stmt := `
886+
SELECT exists (
887+
SELECT 1
888+
FROM job_proposals
889+
INNER JOIN jobs ON job_proposals.external_job_id = jobs.external_job_id
890+
WHERE jobs.id = $1
891+
AND job_proposals.feeds_manager_id = $2
892+
AND job_proposals.status <> 'deleted'
893+
);
894+
`
895+
896+
err = o.ds.GetContext(ctx, &exists, stmt, jobID, feedsManagerID)
897+
return exists, errors.Wrap(err, "IsJobManagedByFeedsManager failed")
898+
}

core/services/feeds/orm_test.go

Lines changed: 83 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -791,8 +791,6 @@ func Test_ORM_CountJobProposalsByStatus(t *testing.T) {
791791
}
792792

793793
for _, tc := range testCases {
794-
tc := tc
795-
796794
t.Run(tc.name, func(t *testing.T) {
797795
orm := setupORM(t)
798796

@@ -1067,8 +1065,6 @@ func Test_ORM_CancelSpec(t *testing.T) {
10671065
}
10681066

10691067
for _, tc := range testCases {
1070-
tc := tc
1071-
10721068
t.Run(tc.name, func(t *testing.T) {
10731069
ctx := testutils.Context(t)
10741070
orm := setupORM(t)
@@ -1229,8 +1225,6 @@ func Test_ORM_DeleteProposal(t *testing.T) {
12291225
}
12301226

12311227
for _, tc := range testCases {
1232-
tc := tc
1233-
12341228
t.Run(tc.name, func(t *testing.T) {
12351229
ctx := testutils.Context(t)
12361230
orm := setupORM(t)
@@ -1348,8 +1342,6 @@ func Test_ORM_RevokeSpec(t *testing.T) {
13481342
}
13491343

13501344
for _, tc := range testCases {
1351-
tc := tc
1352-
13531345
t.Run(tc.name, func(t *testing.T) {
13541346
ctx := testutils.Context(t)
13551347
orm := setupORM(t)
@@ -1597,8 +1589,6 @@ func Test_ORM_RejectSpec(t *testing.T) {
15971589
}
15981590

15991591
for _, tc := range testCases {
1600-
tc := tc
1601-
16021592
t.Run(tc.name, func(t *testing.T) {
16031593
ctx := testutils.Context(t)
16041594
orm := setupORM(t)
@@ -1693,6 +1683,88 @@ func Test_ORM_IsJobManaged(t *testing.T) {
16931683
assert.False(t, isManaged)
16941684
}
16951685

1686+
func Test_ORM_IsJobManagedByFeedsManager(t *testing.T) {
1687+
t.Parallel()
1688+
ctx := testutils.Context(t)
1689+
1690+
var (
1691+
orm = setupORM(t)
1692+
fmID1 = createFeedsManager(t, orm)
1693+
)
1694+
1695+
mgr2 := &feeds.FeedsManager{
1696+
URI: "http://192.168.0.2",
1697+
Name: "Chainlink FMS 2",
1698+
PublicKey: crypto.PublicKey([]byte("22222222222222222222222222222222")),
1699+
}
1700+
fmID2, err := orm.CreateManager(ctx, mgr2)
1701+
require.NoError(t, err)
1702+
1703+
var (
1704+
jpID1 = createJobProposal(t, orm, feeds.JobProposalStatusPending, fmID1)
1705+
jpID2 = createJobProposal(t, orm, feeds.JobProposalStatusPending, fmID2)
1706+
specID1 = createJobSpec(t, orm, jpID1)
1707+
specID2 = createJobSpec(t, orm, jpID2)
1708+
externalJobID1 = uuid.NullUUID{UUID: uuid.New(), Valid: true}
1709+
externalJobID2 = uuid.NullUUID{UUID: uuid.New(), Valid: true}
1710+
)
1711+
1712+
j1 := createJob(t, orm.db, externalJobID1.UUID)
1713+
j2 := createJob(t, orm.db, externalJobID2.UUID)
1714+
1715+
isManaged, err := orm.IsJobManagedByFeedsManager(ctx, int64(j1.ID), fmID1)
1716+
require.NoError(t, err)
1717+
assert.False(t, isManaged)
1718+
1719+
isManaged, err = orm.IsJobManagedByFeedsManager(ctx, int64(j1.ID), fmID2)
1720+
require.NoError(t, err)
1721+
assert.False(t, isManaged)
1722+
1723+
err = orm.ApproveSpec(ctx, specID1, externalJobID1.UUID)
1724+
require.NoError(t, err)
1725+
1726+
isManaged, err = orm.IsJobManagedByFeedsManager(ctx, int64(j1.ID), fmID1)
1727+
require.NoError(t, err)
1728+
assert.True(t, isManaged)
1729+
1730+
isManaged, err = orm.IsJobManagedByFeedsManager(ctx, int64(j1.ID), fmID2)
1731+
require.NoError(t, err)
1732+
assert.False(t, isManaged)
1733+
1734+
err = orm.ApproveSpec(ctx, specID2, externalJobID2.UUID)
1735+
require.NoError(t, err)
1736+
1737+
isManaged, err = orm.IsJobManagedByFeedsManager(ctx, int64(j2.ID), fmID2)
1738+
require.NoError(t, err)
1739+
assert.True(t, isManaged)
1740+
1741+
isManaged, err = orm.IsJobManagedByFeedsManager(ctx, int64(j2.ID), fmID1)
1742+
require.NoError(t, err)
1743+
assert.False(t, isManaged)
1744+
1745+
nonExistentJobID := int64(99998)
1746+
nonExistentFeedsManagerID := int64(99999)
1747+
1748+
isManaged, err = orm.IsJobManagedByFeedsManager(ctx, nonExistentJobID, fmID1)
1749+
require.NoError(t, err)
1750+
assert.False(t, isManaged)
1751+
1752+
isManaged, err = orm.IsJobManagedByFeedsManager(ctx, int64(j1.ID), nonExistentFeedsManagerID)
1753+
require.NoError(t, err)
1754+
assert.False(t, isManaged)
1755+
1756+
err = orm.DeleteProposal(ctx, jpID1)
1757+
require.NoError(t, err)
1758+
1759+
isManaged, err = orm.IsJobManagedByFeedsManager(ctx, int64(j1.ID), fmID1)
1760+
require.NoError(t, err)
1761+
assert.False(t, isManaged)
1762+
1763+
isManaged, err = orm.IsJobManagedByFeedsManager(ctx, int64(j2.ID), fmID2)
1764+
require.NoError(t, err)
1765+
assert.True(t, isManaged)
1766+
}
1767+
16961768
// Helpers
16971769

16981770
func assertChainConfigEqual(t *testing.T, want map[string]interface{}, actual feeds.ChainConfig) {
@@ -1758,6 +1830,7 @@ func createJob(t *testing.T, db *sqlx.DB, externalJobID uuid.UUID) *job.Job {
17581830
testspecs.GenerateOCRSpec(testspecs.OCRSpecParams{
17591831
JobID: externalJobID.String(),
17601832
TransmitterAddress: address.Hex(),
1833+
ContractAddress: testutils.NewAddress().Hex(),
17611834
DS1BridgeName: bridge.Name.String(),
17621835
DS2BridgeName: bridge2.Name.String(),
17631836
}).Toml(),

0 commit comments

Comments
 (0)