Skip to content

Commit

Permalink
simplify build context update logic
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Nov 15, 2024
1 parent 4860662 commit 9296515
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const (
func TestBuilds(t *testing.T) {
sch := generateInitialSchema(t)
in.Run(t,
in.WithLanguages("go", "java"),
in.WithLanguages("go"),
in.WithoutController(),
in.CopyModule(MODULE_NAME),
startPlugin(),
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestBuildsWhenAlreadyLocked(t *testing.T) {
sch := generateInitialSchema(t)

in.Run(t,
in.WithLanguages("go", "java"),
in.WithLanguages("go"),
in.WithoutController(),
in.CopyModule(MODULE_NAME),
startPlugin(),
Expand Down
8 changes: 0 additions & 8 deletions jvm-runtime/plugin/common/java_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ func TestExtractModuleDepsKotlin(t *testing.T) {
}

func TestJavaConfigDefaults(t *testing.T) {
watch := []string{
"pom.xml",
"src/**",
"build/generated",
"target/generated-sources",
}
for _, tt := range []struct {
language string
dir string
Expand All @@ -40,7 +34,6 @@ func TestJavaConfigDefaults(t *testing.T) {
DevModeBuild: optional.Some("mvn quarkus:dev"),
DeployDir: "target",
GeneratedSchemaDir: optional.Some("src/main/ftl-module-schema"),
Watch: watch,
LanguageConfig: map[string]any{
"build-tool": "maven",
},
Expand All @@ -54,7 +47,6 @@ func TestJavaConfigDefaults(t *testing.T) {
DevModeBuild: optional.Some("mvn quarkus:dev"),
DeployDir: "target",
GeneratedSchemaDir: optional.Some("src/main/ftl-module-schema"),
Watch: watch,
LanguageConfig: map[string]any{
"build-tool": "maven",
},
Expand Down
185 changes: 19 additions & 166 deletions jvm-runtime/plugin/common/jvmcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,16 @@ import (
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/sha256"
islices "github.com/TBD54566975/ftl/internal/slices"
"github.com/TBD54566975/ftl/internal/watch"
)

const BuildLockTimeout = time.Minute
const SchemaFile = "schema.pb"
const ErrorFile = "errors.pb"

//sumtype:decl
type updateEvent interface{ updateEvent() }

type buildContextUpdatedEvent struct {
buildCtx buildContext
}

func (buildContextUpdatedEvent) updateEvent() {}

type filesUpdatedEvent struct{}

func (filesUpdatedEvent) updateEvent() {}

// buildContext contains contextual information needed to build.
type buildContext struct {
ID string
Expand All @@ -84,7 +74,7 @@ func buildContextFromProto(proto *langpb.BuildContext) (buildContext, error) {
}

type Service struct {
updatesTopic *pubsub.Topic[updateEvent]
updatesTopic *pubsub.Topic[buildContextUpdatedEvent]
acceptsContextUpdates atomic.Value[bool]
scaffoldFiles *zip.Reader
}
Expand All @@ -93,7 +83,7 @@ var _ langconnect.LanguageServiceHandler = &Service{}

func New(scaffoldFiles *zip.Reader) *Service {
return &Service{
updatesTopic: pubsub.New[updateEvent](),
updatesTopic: pubsub.New[buildContextUpdatedEvent](),
scaffoldFiles: scaffoldFiles,
}
}
Expand Down Expand Up @@ -175,73 +165,30 @@ func (s *Service) Build(ctx context.Context, req *connect.Request[langpb.BuildRe
if req.Msg.RebuildAutomatically {
return s.handleDevModeRequest(ctx, req, stream)
}
events := make(chan updateEvent, 32)
s.updatesTopic.Subscribe(events)
defer s.updatesTopic.Unsubscribe(events)

// cancel context when stream ends so that watcher can be stopped
ctx, cancel := context.WithCancel(ctx)
defer cancel()

buildCtx, err := buildContextFromProto(req.Msg.BuildContext)
if err != nil {
return err
}

watchPatterns, err := relativeWatchPatterns(buildCtx.Config.Dir, buildCtx.Config.Watch)
if err != nil {
return err
}

watcher := watch.NewWatcher(watchPatterns...)
if req.Msg.RebuildAutomatically {
s.acceptsContextUpdates.Store(true)
defer s.acceptsContextUpdates.Store(false)

if err := watchFiles(ctx, watcher, buildCtx, events); err != nil {
return err
}
}

// Initial build
if err := buildAndSend(ctx, stream, buildCtx, false, watcher.GetTransaction(buildCtx.Config.Dir)); err != nil {
if err := buildAndSend(ctx, stream, buildCtx, false); err != nil {
return err
}
if !req.Msg.RebuildAutomatically {
return nil
}

// Watch for changes and build as needed
for {
select {
case e := <-events:
var isAutomaticRebuild bool
buildCtx, isAutomaticRebuild = buildContextFromPendingEvents(ctx, buildCtx, events, e)
if isAutomaticRebuild {
err = stream.Send(&langpb.BuildEvent{
Event: &langpb.BuildEvent_AutoRebuildStarted{
AutoRebuildStarted: &langpb.AutoRebuildStarted{
ContextId: buildCtx.ID,
},
},
})
if err != nil {
return fmt.Errorf("could not send auto rebuild started event: %w", err)
}
}
if err = buildAndSend(ctx, stream, buildCtx, isAutomaticRebuild, watcher.GetTransaction(buildCtx.Config.Dir)); err != nil {
return err
}
case <-ctx.Done():
log.FromContext(ctx).Infof("Build call ending - ctx cancelled")
return nil
}
}
return nil
}

func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request[langpb.BuildRequest], stream *connect.ServerStream[langpb.BuildEvent]) error {
logger := log.FromContext(ctx)

// cancel context when stream ends so that watcher can be stopped
ctx, cancel := context.WithCancel(ctx)
defer cancel()

events := make(chan buildContextUpdatedEvent, 32)
s.updatesTopic.Subscribe(events)
defer s.updatesTopic.Unsubscribe(events)
first := true
buildCtx, err := buildContextFromProto(req.Msg.BuildContext)
if err != nil {
Expand All @@ -263,7 +210,6 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request
errorHash := sha256.SHA256{}
schemaHash := sha256.SHA256{}

ctx, cancel := context.WithCancel(context.Background())
ctx = log.ContextWithLogger(ctx, logger)
bind := fmt.Sprintf("http://localhost:%d", address.Port)
go func() {
Expand All @@ -287,6 +233,8 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request
select {
case <-ctx.Done():
return nil
case bc := <-events:
buildCtx = bc.buildCtx
case <-schemaChangeTicker.C:
select {
// We only force a reload every second, but we check for schema changes every 100ms
Expand Down Expand Up @@ -387,21 +335,13 @@ func (s *Service) handleDevModeRequest(ctx context.Context, req *connect.Request
}
}

func build(ctx context.Context, bctx buildContext, autoRebuild bool, transaction watch.ModifyFilesTransaction) (*langpb.BuildEvent, error) {
func build(ctx context.Context, bctx buildContext, autoRebuild bool) (*langpb.BuildEvent, error) {
logger := log.FromContext(ctx)
release, err := flock.Acquire(ctx, bctx.Config.BuildLock, BuildLockTimeout)
if err != nil {
return nil, fmt.Errorf("could not acquire build lock: %w", err)
}
defer release() //nolint:errcheck
if err := transaction.Begin(); err != nil {
return nil, fmt.Errorf("could not start a file transaction: %w", err)
}
defer func() {
if terr := transaction.End(); terr != nil {
logger.Errorf(terr, "failed to end file transaction")
}
}()

deps, err := extractDependencies(bctx.Config.Module, bctx.Config.Dir)
if err != nil {
Expand All @@ -426,7 +366,7 @@ func build(ctx context.Context, bctx buildContext, autoRebuild bool, transaction
return nil, fmt.Errorf("failed to build module %q: %w", config.Module, err)
}
if javaConfig.BuildTool == JavaBuildToolMaven {
if err := setPOMProperties(ctx, config.Dir, transaction); err != nil {
if err := setPOMProperties(ctx, config.Dir); err != nil {
// This is not a critical error, things will probably work fine
// TBH updating the pom is maybe not the best idea anyway
logger.Warnf("unable to update ftl.version in %s: %s", config.Dir, err.Error())
Expand Down Expand Up @@ -512,84 +452,12 @@ func (s *Service) BuildContextUpdated(ctx context.Context, req *connect.Request[
return connect.NewResponse(&langpb.BuildContextUpdatedResponse{}), nil
}

func watchFiles(ctx context.Context, watcher *watch.Watcher, buildCtx buildContext, events chan updateEvent) error {
watchTopic, err := watcher.Watch(ctx, time.Second, []string{buildCtx.Config.Dir})
if err != nil {
return fmt.Errorf("could not watch for file changes: %w", err)
}
log.FromContext(ctx).Debugf("Watching for file changes: %s", buildCtx.Config.Dir)
watchEvents := make(chan watch.WatchEvent, 32)
watchTopic.Subscribe(watchEvents)

// We need watcher to calculate file hashes before we do initial build so we can detect changes
select {
case e := <-watchEvents:
_, ok := e.(watch.WatchEventModuleAdded)
if !ok {
return fmt.Errorf("expected module added event, got: %T", e)
}
case <-time.After(3 * time.Second):
return fmt.Errorf("expected module added event, got no event")
case <-ctx.Done():
return fmt.Errorf("context done: %w", ctx.Err())
}

go func() {
for {
select {
case e := <-watchEvents:
if change, ok := e.(watch.WatchEventModuleChanged); ok {
log.FromContext(ctx).Infof("Found file changes: %s", change)
events <- filesUpdatedEvent{}
}

case <-ctx.Done():
return
}
}
}()
return nil
}

// buildContextFromPendingEvents processes all pending events to determine the latest context and whether the build is automatic.
func buildContextFromPendingEvents(ctx context.Context, buildCtx buildContext, events chan updateEvent, firstEvent updateEvent) (newBuildCtx buildContext, isAutomaticRebuild bool) {
allEvents := []updateEvent{firstEvent}
// find any other events in the queue
for {
select {
case e := <-events:
allEvents = append(allEvents, e)
case <-ctx.Done():
return buildCtx, false
default:
// No more events waiting to be processed
hasExplicitBuilt := false
for _, e := range allEvents {
switch e := e.(type) {
case buildContextUpdatedEvent:
buildCtx = e.buildCtx
hasExplicitBuilt = true
case filesUpdatedEvent:
}

}
switch e := firstEvent.(type) {
case buildContextUpdatedEvent:
buildCtx = e.buildCtx
hasExplicitBuilt = true
case filesUpdatedEvent:
}
return buildCtx, !hasExplicitBuilt
}
}
}

// buildAndSend builds the module and sends the build event to the stream.
//
// Build errors are sent over the stream as a BuildFailure event.
// This function only returns an error if events could not be send over the stream.
func buildAndSend(ctx context.Context, stream *connect.ServerStream[langpb.BuildEvent], buildCtx buildContext, isAutomaticRebuild bool, transaction watch.ModifyFilesTransaction) error {
buildEvent, err := build(ctx, buildCtx, isAutomaticRebuild, transaction)
func buildAndSend(ctx context.Context, stream *connect.ServerStream[langpb.BuildEvent], buildCtx buildContext, isAutomaticRebuild bool) error {
buildEvent, err := build(ctx, buildCtx, isAutomaticRebuild)
if err != nil {
buildEvent = buildFailure(buildCtx, isAutomaticRebuild, builderrors.Error{
Type: builderrors.FTL,
Expand Down Expand Up @@ -617,18 +485,6 @@ func buildFailure(buildCtx buildContext, isAutomaticRebuild bool, errs ...builde
}
}

func relativeWatchPatterns(moduleDir string, watchPaths []string) ([]string, error) {
relativePaths := make([]string, len(watchPaths))
for i, path := range watchPaths {
relative, err := filepath.Rel(moduleDir, path)
if err != nil {
return nil, fmt.Errorf("could create relative path for watch pattern: %w", err)
}
relativePaths[i] = relative
}
return relativePaths, nil
}

const JavaBuildToolMaven string = "maven"
const JavaBuildToolGradle string = "gradle"

Expand All @@ -648,9 +504,7 @@ func loadJavaConfig(languageConfig any, language string) (JavaConfig, error) {
func (s *Service) ModuleConfigDefaults(ctx context.Context, req *connect.Request[langpb.ModuleConfigDefaultsRequest]) (*connect.Response[langpb.ModuleConfigDefaultsResponse], error) {
defaults := langpb.ModuleConfigDefaultsResponse{
GeneratedSchemaDir: ptr("src/main/ftl-module-schema"),
// Watch defaults to files related to maven and gradle
Watch: []string{"pom.xml", "src/**", "build/generated", "target/generated-sources"},
LanguageConfig: &structpb.Struct{Fields: map[string]*structpb.Value{}},
LanguageConfig: &structpb.Struct{Fields: map[string]*structpb.Value{}},
}
dir := req.Msg.Dir
pom := filepath.Join(dir, "pom.xml")
Expand Down Expand Up @@ -776,7 +630,7 @@ func extractKotlinFTLImports(self, dir string) ([]string, error) {

// setPOMProperties updates the ftl.version properties in the
// pom.xml file in the given base directory.
func setPOMProperties(ctx context.Context, baseDir string, transaction watch.ModifyFilesTransaction) error {
func setPOMProperties(ctx context.Context, baseDir string) error {
logger := log.FromContext(ctx)
ftlVersion := ftl.Version
// If we are running in dev mode, ftl.Version will be "dev"
Expand Down Expand Up @@ -821,7 +675,6 @@ func setPOMProperties(ctx context.Context, baseDir string, transaction watch.Mod
if err != nil {
return fmt.Errorf("unable to write %s: %w", pomFile, err)
}
err = transaction.ModifiedFiles(pomFile)
if err != nil {
return fmt.Errorf("could not mark %s as modified: %w", pomFile, err)
}
Expand Down

0 comments on commit 9296515

Please sign in to comment.