Skip to content

Commit

Permalink
revert breaking API changes, wrap CodecP internally
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Jan 7, 2024
1 parent d7194bc commit c1f7942
Show file tree
Hide file tree
Showing 25 changed files with 74 additions and 130 deletions.
33 changes: 29 additions & 4 deletions codec.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,42 @@
package goka

import "io"
import (
"io"

"github.com/lovoo/goka/codec"
)

// Codec decodes and encodes from and to []byte
type Codec interface {
Encode(value interface{}) (data []byte, err error)
Decode(data []byte) (value interface{}, err error)
}

type CodecP interface {
Codec

DecodeP(data []byte) (value interface{}, closer io.Closer, err error)
}

// FuncCloser implements io.Closer-interface for convenience
type FuncCloser func() error
// CloserFunc implements io.Closer-interface for convenience when wrapping functions
type CloserFunc func() error

func (f FuncCloser) Close() error {
func (f CloserFunc) Close() error {
return f()
}

type codecWrapper struct {
Codec
}

func (cw *codecWrapper) DecodeP(data []byte) (value interface{}, closer io.Closer, err error) {
val, err := cw.Codec.Decode(data)
return val, codec.NoopCloser, err
}

func convertOrFakeCodec(c Codec) CodecP {
if cp, ok := c.(CodecP); ok {
return cp
}
return &codecWrapper{Codec: c}
}
6 changes: 2 additions & 4 deletions codec/closer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package codec

type Closer interface {
Close()
}

type nullCloser struct{}

func (n *nullCloser) Close() error { return nil }

// NoopCloser can be used for returning io.Closer interfaces, whose Close call does
// nothing.
var NoopCloser = new(nullCloser)
15 changes: 0 additions & 15 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package codec

import (
"fmt"
"io"
"strconv"
)

Expand All @@ -24,11 +23,6 @@ func (d *Bytes) Decode(data []byte) (interface{}, error) {
return data, nil
}

// Decode of defaultCodec simply returns the data
func (d *Bytes) DecodeP(data []byte) (interface{}, io.Closer, error) {
return data, NoopCloser, nil
}

// String is a commonly used codec to encode and decode string <-> []byte
type String struct{}

Expand All @@ -46,10 +40,6 @@ func (c *String) Decode(data []byte) (interface{}, error) {
return string(data), nil
}

func (c *String) DecodeP(data []byte) (interface{}, io.Closer, error) {
return string(data), NoopCloser, nil
}

// Int64 is a commonly used codec to encode and decode string <-> []byte
type Int64 struct{}

Expand All @@ -70,8 +60,3 @@ func (c *Int64) Decode(data []byte) (interface{}, error) {
}
return intVal, nil
}

func (c *Int64) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, NoopCloser, err
}
6 changes: 0 additions & 6 deletions examples/2-clicks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
Expand Down Expand Up @@ -59,11 +58,6 @@ func (jc *userCodec) Decode(data []byte) (interface{}, error) {
return &c, nil
}

func (jc *userCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := jc.Decode(data)
return dec, codec.NoopCloser, err
}

func runEmitter() {
emitter, err := goka.NewEmitter(brokers, topic,
new(codec.String))
Expand Down
12 changes: 0 additions & 12 deletions examples/3-messaging/blocker/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package blocker
import (
"context"
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

Expand All @@ -31,11 +29,6 @@ func (c *BlockEventCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *BlockEventCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

type BlockValue struct {
Blocked bool
}
Expand All @@ -50,11 +43,6 @@ func (c *BlockValueCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *BlockValueCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

func block(ctx goka.Context, msg interface{}) {
var s *BlockValue
if v := ctx.Value(); v == nil {
Expand Down
7 changes: 0 additions & 7 deletions examples/3-messaging/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package collector
import (
"context"
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
messaging "github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)
Expand All @@ -30,11 +28,6 @@ func (c *MessageListCodec) Decode(data []byte) (interface{}, error) {
return m, err
}

func (c *MessageListCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

func collect(ctx goka.Context, msg interface{}) {
var ml []messaging.Message
if v := ctx.Value(); v != nil {
Expand Down
7 changes: 0 additions & 7 deletions examples/3-messaging/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package detector
import (
"context"
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
messaging "github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/blocker"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
Expand Down Expand Up @@ -35,11 +33,6 @@ func (c *CountersCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *CountersCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

func getValue(ctx goka.Context) *Counters {
if v := ctx.Value(); v != nil {
return v.(*Counters)
Expand Down
12 changes: 0 additions & 12 deletions examples/3-messaging/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package messaging

import (
"encoding/json"
"io"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

var (
Expand All @@ -30,11 +28,6 @@ func (c *MessageCodec) Decode(data []byte) (interface{}, error) {
return &m, json.Unmarshal(data, &m)
}

func (c *MessageCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}

type MessageListCodec struct{}

func (c *MessageListCodec) Encode(value interface{}) ([]byte, error) {
Expand All @@ -46,8 +39,3 @@ func (c *MessageListCodec) Decode(data []byte) (interface{}, error) {
err := json.Unmarshal(data, &m)
return m, err
}

func (c *MessageListCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}
6 changes: 0 additions & 6 deletions examples/5-multiple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -68,11 +67,6 @@ func (jc *userCodec) Decode(data []byte) (interface{}, error) {
return &c, nil
}

func (jc *userCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := jc.Decode(data)
return dec, codec.NoopCloser, err
}

func runEmitter(ctx context.Context) (rerr error) {
emitterA, err := goka.NewEmitter(brokers, inputA, new(codec.String))
if err != nil {
Expand Down
8 changes: 0 additions & 8 deletions examples/7-redis/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package main

import (
"encoding/json"
"io"

"github.com/lovoo/goka/codec"
)

type Codec struct{}
Expand All @@ -21,8 +18,3 @@ func (c *Codec) Decode(data []byte) (interface{}, error) {
err := json.Unmarshal(data, event)
return event, err
}

func (c *Codec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := c.Decode(data)
return dec, codec.NoopCloser, err
}
6 changes: 0 additions & 6 deletions examples/8-monitoring/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -64,11 +63,6 @@ func (jc *userCodec) Decode(data []byte) (interface{}, error) {
return &c, nil
}

func (jc *userCodec) DecodeP(data []byte) (interface{}, io.Closer, error) {
dec, err := jc.Decode(data)
return dec, codec.NoopCloser, err
}

func runEmitter(ctx context.Context) (rerr error) {
emitter, err := goka.NewEmitter(brokers, topic,
new(codec.String))
Expand Down
Loading

0 comments on commit c1f7942

Please sign in to comment.